patterncsharpMinor
Concurrent non-blocking update of cached list of on-line users
Viewed 0 times
cachedupdatelinenonblockinglistusersconcurrent
Problem
I have following problem: My server (ASP.MVC WebAPI) is tracking, when client application ("Agent") is on-line. It's storing this inf on following table:
"Agent" software is sending some data to the server with some frequecy, each HTTP request is threated as ping/keep alive, and tels server, that agent is Up and running.
Algorithm is following:
Following implementation is using non blocking algorithm. I'd like you to review correctness of this algorithm.
```
namespace X
{
[SingletonScope]
public class AgentPingReceiver : IAgentPingReceiver
{
private ILogger Log = Serilog.Log.Logger.ForContext();
private static readonly TimeSpan AgentTimeout = TimeSpan.FromMinutes(10);
private static readonly TimeSpan StateRefreshInterval = TimeSpan.FromMinutes(1);
private CancellationTokenSource _cancelToken;
ConcurrentDictionary _lastPingReceived;
public AgentPingReceiver()
{
_lastPingReceived = new ConcurrentDictionary();
_cancelToken = new CancellationTokenSource();
Task.Run((Func)BackgroundChecker,_cancelToken.Token);
}
private async Task BackgroundChecker()
+-------+-------------------------------+-----+
|AgentId| Date |State|
+-------+-------------------------------+-----+
| 1 | 30 may 2016 г. 3:02:20 +03:00 |True |
| 1 | 30 may 2016 г. 0:25:26 +02:00 |True |
| 1 |29 may 2016 г. 23:05:59 +02:00 |False|
| 1 |29 may 2016 г. 23:05:01 +02:00 |True |
+-------+-------------------------------+-----+"Agent" software is sending some data to the server with some frequecy, each HTTP request is threated as ping/keep alive, and tels server, that agent is Up and running.
Algorithm is following:
- Ping received, if previous ping time isn't stored in RAM cache:
- Write ping time into cache
- Add DB record, that "Agent" become on-line
- Ping received, if previous ping time is stored in RAM cache:
- update ping time in cache
- Server haven'r received anything from "Agent" for 10 minutes (AgentTimeout)
- remove this agent's record from cache
- add record to DB that agent wen't offline
Following implementation is using non blocking algorithm. I'd like you to review correctness of this algorithm.
```
namespace X
{
[SingletonScope]
public class AgentPingReceiver : IAgentPingReceiver
{
private ILogger Log = Serilog.Log.Logger.ForContext();
private static readonly TimeSpan AgentTimeout = TimeSpan.FromMinutes(10);
private static readonly TimeSpan StateRefreshInterval = TimeSpan.FromMinutes(1);
private CancellationTokenSource _cancelToken;
ConcurrentDictionary _lastPingReceived;
public AgentPingReceiver()
{
_lastPingReceived = new ConcurrentDictionary();
_cancelToken = new CancellationTokenSource();
Task.Run((Func)BackgroundChecker,_cancelToken.Token);
}
private async Task BackgroundChecker()
Solution
OK, it is going to be a long answer :)
UPDATE
Actually, marking for deletion is not necessary, see Little-known gems: Atomic conditional removals from ConcurrentDictionary.
I would still consider the rest of the answer as an approach to structure the task.
END OF UPDATE
Now let’s define abstractions for proper dependency management:
A little bit of "explicit language" - not necessary at all :)
We are defining one business object, it knows how to calculate expiration:
```
class Session : ValueObject
{
private static readonly TimeSpan Ttl = TimeSpan.FromMinutes(10);
public static Session Start(AgentId agentId, DateTimeOffset at) =>
new Session(agentId, at, at);
public Session(AgentId agentId, DateTimeOffset started, DateTimeOffset lastActivity)
{
AgentId = agentId;
Started = started;
LastActivity = lastActivity;
}
public AgentId AgentId { get; }
public DateTimeOffset Started { get; }
public DateTimeOffset LastActivity { get; }
UPDATE
Actually, marking for deletion is not necessary, see Little-known gems: Atomic conditional removals from ConcurrentDictionary.
I would still consider the rest of the answer as an approach to structure the task.
END OF UPDATE
ConcurrentDictionary - when you about to remove from it - first mark for deletion to get rid of racing conditions. I defined this helper class to assist with this task:class AsyncDictionary
where TValue : class
{
ConcurrentDictionary Values { get; } =
new ConcurrentDictionary();
public delegate bool TryUpdate(ref TValue value);
///
/// It makes multiple attempts to execute if item under the question is marked for deletion.
///
/// Well, the key.
/// Value factory for add opeartion.
/// It will try to update, but might reject to do so.
/// Reports actual operation executed and the value assigned.
public async Task AddOrUpdateAsync(TKey key, Func add, TryUpdate tryUpdate)
{
while (true)
{
var result = Result.Cancelled;
Values.AddOrUpdate(key,
(k) =>
{
result = new Result(ValueAction.Added, add());
return result.Value;
},
(k, v) =>
{
if (v == null)
result = Result.Delayed;
else
result = tryUpdate(ref v) ?
new Result(ValueAction.Updated, v) :
Result.Cancelled;
return v;
});
if (result.Action == ValueAction.Delayed)
await Task.Delay(10);
else
return result;
}
}
///
/// Concurrent remove first marks the entry for deletion by assigning null
/// to the value.
///
/// Well, the key.
/// Checks if we really want to kill it.
///
public Result Remove(TKey key, Predicate guard)
{
TValue value;
if (!Values.TryGetValue(key, out value))
return Result.Cancelled;
if (value == null)
return Result.Cancelled;
if (!guard(value))
return Result.Cancelled;
if (!Values.TryUpdate(key, null, value))
return Result.Cancelled;
TValue nullValue;
Values.TryRemove(key, out nullValue);
if (nullValue != null)
throw new NotImplementedException();
return new Result(ValueAction.Removed, value);
}
///
/// Removes everything matching the predicate.
///
/// To be or not to be.
/// The corpses.
public IEnumerable Remove(Predicate predicate) =>
from kvp in Values
where predicate(kvp.Value)
select Remove(kvp.Key, predicate) into r
where r.Action == ValueAction.Removed
select r;
public struct Result
{
public static readonly Result Cancelled = new Result(ValueAction.Cancelled);
public static readonly Result Removed = new Result(ValueAction.Removed);
public static readonly Result Delayed = new Result(ValueAction.Delayed);
public Result(ValueAction action)
: this(action, null)
{
}
public Result(ValueAction action, TValue value)
{
Action = action;
Value = value;
}
public ValueAction Action { get; }
public TValue Value { get; }
}
}
enum ValueAction
{
Added,
Updated,
Removed,
Cancelled,
Delayed
}Now let’s define abstractions for proper dependency management:
interface IClock
{
DateTimeOffset GetTime();
}
interface ISessionMonitor : IDisposable
{
Task StartAsync(Session session);
Task EndAsync(Session session);
}
interface IPingMonitor : IDisposable
{
Task PingAsync(AgentId agentId);
}A little bit of "explicit language" - not necessary at all :)
struct AgentId
{
public static implicit operator AgentId(int value) => new AgentId { Value = value };
public static implicit operator int(AgentId agentId) => agentId.Value;
int Value { get; set; }
public override string ToString() => $"Agent #{Value}";
}We are defining one business object, it knows how to calculate expiration:
```
class Session : ValueObject
{
private static readonly TimeSpan Ttl = TimeSpan.FromMinutes(10);
public static Session Start(AgentId agentId, DateTimeOffset at) =>
new Session(agentId, at, at);
public Session(AgentId agentId, DateTimeOffset started, DateTimeOffset lastActivity)
{
AgentId = agentId;
Started = started;
LastActivity = lastActivity;
}
public AgentId AgentId { get; }
public DateTimeOffset Started { get; }
public DateTimeOffset LastActivity { get; }
Code Snippets
class AsyncDictionary<TKey, TValue>
where TValue : class
{
ConcurrentDictionary<TKey, TValue> Values { get; } =
new ConcurrentDictionary<TKey, TValue>();
public delegate bool TryUpdate(ref TValue value);
/// <summary>
/// It makes multiple attempts to execute if item under the question is marked for deletion.
/// </summary>
/// <param name="key">Well, the key.</param>
/// <param name="add">Value factory for add opeartion.</param>
/// <param name="tryUpdate">It will try to update, but might reject to do so.</param>
/// <returns>Reports actual operation executed and the value assigned.</returns>
public async Task<Result> AddOrUpdateAsync(TKey key, Func<TValue> add, TryUpdate tryUpdate)
{
while (true)
{
var result = Result.Cancelled;
Values.AddOrUpdate(key,
(k) =>
{
result = new Result(ValueAction.Added, add());
return result.Value;
},
(k, v) =>
{
if (v == null)
result = Result.Delayed;
else
result = tryUpdate(ref v) ?
new Result(ValueAction.Updated, v) :
Result.Cancelled;
return v;
});
if (result.Action == ValueAction.Delayed)
await Task.Delay(10);
else
return result;
}
}
/// <summary>
/// Concurrent remove first marks the entry for deletion by assigning null
/// to the value.
/// </summary>
/// <param name="key">Well, the key.</param>
/// <param name="guard">Checks if we really want to kill it.</param>
/// <returns></returns>
public Result Remove(TKey key, Predicate<TValue> guard)
{
TValue value;
if (!Values.TryGetValue(key, out value))
return Result.Cancelled;
if (value == null)
return Result.Cancelled;
if (!guard(value))
return Result.Cancelled;
if (!Values.TryUpdate(key, null, value))
return Result.Cancelled;
TValue nullValue;
Values.TryRemove(key, out nullValue);
if (nullValue != null)
throw new NotImplementedException();
return new Result(ValueAction.Removed, value);
}
/// <summary>
/// Removes everything matching the predicate.
/// </summary>
/// <param name="predicate">To be or not to be.</param>
/// <returns>The corpses.</returns>
public IEnumerable<Result> Remove(Predicate<TValue> predicate) =>
from kvp in Values
where predicate(kvp.Value)
select Remove(kvp.Key, predicate) into r
where r.Action == ValueAction.Removed
select r;
public struct Result
{
public static interface IClock
{
DateTimeOffset GetTime();
}
interface ISessionMonitor : IDisposable
{
Task StartAsync(Session session);
Task EndAsync(Session session);
}
interface IPingMonitor : IDisposable
{
Task PingAsync(AgentId agentId);
}struct AgentId
{
public static implicit operator AgentId(int value) => new AgentId { Value = value };
public static implicit operator int(AgentId agentId) => agentId.Value;
int Value { get; set; }
public override string ToString() => $"Agent #{Value}";
}class Session : ValueObject<Session>
{
private static readonly TimeSpan Ttl = TimeSpan.FromMinutes(10);
public static Session Start(AgentId agentId, DateTimeOffset at) =>
new Session(agentId, at, at);
public Session(AgentId agentId, DateTimeOffset started, DateTimeOffset lastActivity)
{
AgentId = agentId;
Started = started;
LastActivity = lastActivity;
}
public AgentId AgentId { get; }
public DateTimeOffset Started { get; }
public DateTimeOffset LastActivity { get; }
public bool IsExpired(DateTimeOffset at) => LastActivity + Ttl < at;
public bool TryExtend(DateTimeOffset till, out Session extended)
{
extended = this;
if (IsExpired(till))
return false;
extended = new Session(AgentId, Started, till);
return true;
}
protected override IEnumerable<object> EqualityCheckAttributes =>
new object[] { AgentId, Started };
}class PingMonitor : IPingMonitor
{
private static readonly TimeSpan Interval = TimeSpan.FromMinutes(1);
AsyncDictionary<AgentId, Session> Sessions { get; } =
new AsyncDictionary<AgentId, Session>();
public PingMonitor(IClock clock, ISessionMonitor monitor)
{
Clock = clock;
Monitor = monitor;
CancellationTokenSource = new CancellationTokenSource();
Task.Run(RunAsync);
}
public void Dispose()
{
CancellationTokenSource.Cancel();
Monitor.Dispose();
}
IClock Clock { get; }
ISessionMonitor Monitor { get; }
CancellationTokenSource CancellationTokenSource { get; }
public Task PingAsync(AgentId agentId) =>
StartOrExtendAsync(agentId);
async Task StartOrExtendAsync(AgentId agentId)
{
var result = await Sessions.AddOrUpdateAsync(
agentId,
() => Session.Start(agentId, Clock.GetTime()),
(ref Session s) => s.TryExtend(Clock.GetTime(), out s));
switch (result.Action)
{
case ValueAction.Added:
await Monitor.StartAsync(result.Value);
return;
case ValueAction.Updated:
return;
case ValueAction.Cancelled:
await EndAsync(agentId);
await StartOrExtendAsync(agentId);
return;
default:
throw new NotImplementedException();
}
}
async Task EndAsync(AgentId agentId)
{
var result = Sessions.Remove(agentId, s => s.IsExpired(Clock.GetTime()));
switch (result.Action)
{
case ValueAction.Removed:
await Monitor.EndAsync(result.Value);
return;
case ValueAction.Cancelled:
return;
default:
throw new NotImplementedException();
}
}
async Task RunAsync()
{
while (!CancellationTokenSource.Token.IsCancellationRequested)
{
await Task.Delay(Interval, CancellationTokenSource.Token);
await Task.WhenAll(from r in Sessions.Remove(s => s.IsExpired(Clock.GetTime()))
select Monitor.EndAsync(r.Value));
}
}
}Context
StackExchange Code Review Q#129689, answer score: 3
Revisions (0)
No revisions yet.