HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Concurrent non-blocking update of cached list of on-line users

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
cachedupdatelinenonblockinglistusersconcurrent

Problem

I have following problem: My server (ASP.MVC WebAPI) is tracking, when client application ("Agent") is on-line. It's storing this inf on following table:

+-------+-------------------------------+-----+
|AgentId|              Date             |State|
+-------+-------------------------------+-----+
| 1     | 30 may 2016 г. 3:02:20 +03:00 |True |
| 1     | 30 may 2016 г. 0:25:26 +02:00 |True |
| 1     |29 may 2016 г. 23:05:59 +02:00 |False|
| 1     |29 may 2016 г. 23:05:01 +02:00 |True |
+-------+-------------------------------+-----+


"Agent" software is sending some data to the server with some frequecy, each HTTP request is threated as ping/keep alive, and tels server, that agent is Up and running.

Algorithm is following:

  • Ping received, if previous ping time isn't stored in RAM cache:



  • Write ping time into cache



  • Add DB record, that "Agent" become on-line



  • Ping received, if previous ping time is stored in RAM cache:



  • update ping time in cache



  • Server haven'r received anything from "Agent" for 10 minutes (AgentTimeout)



  • remove this agent's record from cache



  • add record to DB that agent wen't offline



Following implementation is using non blocking algorithm. I'd like you to review correctness of this algorithm.

```
namespace X
{
[SingletonScope]
public class AgentPingReceiver : IAgentPingReceiver
{
private ILogger Log = Serilog.Log.Logger.ForContext();

private static readonly TimeSpan AgentTimeout = TimeSpan.FromMinutes(10);
private static readonly TimeSpan StateRefreshInterval = TimeSpan.FromMinutes(1);

private CancellationTokenSource _cancelToken;
ConcurrentDictionary _lastPingReceived;

public AgentPingReceiver()
{
_lastPingReceived = new ConcurrentDictionary();
_cancelToken = new CancellationTokenSource();

Task.Run((Func)BackgroundChecker,_cancelToken.Token);
}

private async Task BackgroundChecker()

Solution

OK, it is going to be a long answer :)

UPDATE

Actually, marking for deletion is not necessary, see Little-known gems: Atomic conditional removals from ConcurrentDictionary.

I would still consider the rest of the answer as an approach to structure the task.

END OF UPDATE

ConcurrentDictionary - when you about to remove from it - first mark for deletion to get rid of racing conditions. I defined this helper class to assist with this task:

class AsyncDictionary
    where TValue : class
{
    ConcurrentDictionary Values { get; } =
        new ConcurrentDictionary();

    public delegate bool TryUpdate(ref TValue value);
    
    /// 
    /// It makes multiple attempts to execute if item under the question is marked for deletion.
    /// 
    /// Well, the key.
    /// Value factory for add opeartion.
    /// It will try to update, but might reject to do so.
    /// Reports actual operation executed and the value assigned.
    public async Task AddOrUpdateAsync(TKey key, Func add, TryUpdate tryUpdate)
    {
        while (true)
        {
            var result = Result.Cancelled;
            Values.AddOrUpdate(key,
                (k) =>
                {
                    result = new Result(ValueAction.Added, add());
                    return result.Value;
                },
                (k, v) =>
                {
                    if (v == null)
                        result = Result.Delayed;
                    else                        
                        result = tryUpdate(ref v) ?
                            new Result(ValueAction.Updated, v) :
                            Result.Cancelled;                        

                    return v;
                });

            if (result.Action == ValueAction.Delayed)
                await Task.Delay(10);
            else
                return result;
        }
    }

    /// 
    /// Concurrent remove first marks the entry for deletion by assigning null 
    /// to the value. 
    /// 
    /// Well, the key.
    /// Checks if we really want to kill it.
    /// 
    public Result Remove(TKey key, Predicate guard)
    {
        TValue value;
        if (!Values.TryGetValue(key, out value))
            return Result.Cancelled;

        if (value == null)
            return Result.Cancelled;

        if (!guard(value))
            return Result.Cancelled;

        if (!Values.TryUpdate(key, null, value))
            return Result.Cancelled;

        TValue nullValue;
        Values.TryRemove(key, out nullValue);
        if (nullValue != null)
            throw new NotImplementedException();

        return new Result(ValueAction.Removed, value);
    }

    /// 
    /// Removes everything matching the predicate.
    /// 
    /// To be or not to be.
    /// The corpses.
    public IEnumerable Remove(Predicate predicate) =>
        from kvp in Values
        where predicate(kvp.Value)
        select Remove(kvp.Key, predicate) into r
        where r.Action == ValueAction.Removed
        select r;        

    public struct Result
    {
        public static readonly Result Cancelled = new Result(ValueAction.Cancelled);
        public static readonly Result Removed = new Result(ValueAction.Removed);
        public static readonly Result Delayed = new Result(ValueAction.Delayed);

        public Result(ValueAction action)
            : this(action, null)
        {
        }

        public Result(ValueAction action, TValue value)
        {
            Action = action;
            Value = value;
        }

        public ValueAction Action { get; }
        public TValue Value { get; }
    }
}

enum ValueAction
{
    Added,
    Updated,
    Removed,
    Cancelled,
    Delayed
}


Now let’s define abstractions for proper dependency management:

interface IClock
{
    DateTimeOffset GetTime();
}

interface ISessionMonitor : IDisposable
{
    Task StartAsync(Session session);
    Task EndAsync(Session session);
}

interface IPingMonitor : IDisposable
{
    Task PingAsync(AgentId agentId);
}


A little bit of "explicit language" - not necessary at all :)

struct AgentId
{
    public static implicit operator AgentId(int value) => new AgentId { Value = value };
    public static implicit operator int(AgentId agentId) => agentId.Value;
    int Value { get; set; }
    public override string ToString() => $"Agent #{Value}";
}


We are defining one business object, it knows how to calculate expiration:

```
class Session : ValueObject
{
private static readonly TimeSpan Ttl = TimeSpan.FromMinutes(10);

public static Session Start(AgentId agentId, DateTimeOffset at) =>
new Session(agentId, at, at);

public Session(AgentId agentId, DateTimeOffset started, DateTimeOffset lastActivity)
{
AgentId = agentId;
Started = started;
LastActivity = lastActivity;
}

public AgentId AgentId { get; }
public DateTimeOffset Started { get; }
public DateTimeOffset LastActivity { get; }

Code Snippets

class AsyncDictionary<TKey, TValue>
    where TValue : class
{
    ConcurrentDictionary<TKey, TValue> Values { get; } =
        new ConcurrentDictionary<TKey, TValue>();

    public delegate bool TryUpdate(ref TValue value);
    
    /// <summary>
    /// It makes multiple attempts to execute if item under the question is marked for deletion.
    /// </summary>
    /// <param name="key">Well, the key.</param>
    /// <param name="add">Value factory for add opeartion.</param>
    /// <param name="tryUpdate">It will try to update, but might reject to do so.</param>
    /// <returns>Reports actual operation executed and the value assigned.</returns>
    public async Task<Result> AddOrUpdateAsync(TKey key, Func<TValue> add, TryUpdate tryUpdate)
    {
        while (true)
        {
            var result = Result.Cancelled;
            Values.AddOrUpdate(key,
                (k) =>
                {
                    result = new Result(ValueAction.Added, add());
                    return result.Value;
                },
                (k, v) =>
                {
                    if (v == null)
                        result = Result.Delayed;
                    else                        
                        result = tryUpdate(ref v) ?
                            new Result(ValueAction.Updated, v) :
                            Result.Cancelled;                        

                    return v;
                });

            if (result.Action == ValueAction.Delayed)
                await Task.Delay(10);
            else
                return result;
        }
    }

    /// <summary>
    /// Concurrent remove first marks the entry for deletion by assigning null 
    /// to the value. 
    /// </summary>
    /// <param name="key">Well, the key.</param>
    /// <param name="guard">Checks if we really want to kill it.</param>
    /// <returns></returns>
    public Result Remove(TKey key, Predicate<TValue> guard)
    {
        TValue value;
        if (!Values.TryGetValue(key, out value))
            return Result.Cancelled;

        if (value == null)
            return Result.Cancelled;

        if (!guard(value))
            return Result.Cancelled;

        if (!Values.TryUpdate(key, null, value))
            return Result.Cancelled;

        TValue nullValue;
        Values.TryRemove(key, out nullValue);
        if (nullValue != null)
            throw new NotImplementedException();

        return new Result(ValueAction.Removed, value);
    }

    /// <summary>
    /// Removes everything matching the predicate.
    /// </summary>
    /// <param name="predicate">To be or not to be.</param>
    /// <returns>The corpses.</returns>
    public IEnumerable<Result> Remove(Predicate<TValue> predicate) =>
        from kvp in Values
        where predicate(kvp.Value)
        select Remove(kvp.Key, predicate) into r
        where r.Action == ValueAction.Removed
        select r;        

    public struct Result
    {
        public static 
interface IClock
{
    DateTimeOffset GetTime();
}

interface ISessionMonitor : IDisposable
{
    Task StartAsync(Session session);
    Task EndAsync(Session session);
}

interface IPingMonitor : IDisposable
{
    Task PingAsync(AgentId agentId);
}
struct AgentId
{
    public static implicit operator AgentId(int value) => new AgentId { Value = value };
    public static implicit operator int(AgentId agentId) => agentId.Value;
    int Value { get; set; }
    public override string ToString() => $"Agent #{Value}";
}
class Session : ValueObject<Session>
{
    private static readonly TimeSpan Ttl = TimeSpan.FromMinutes(10);

    public static Session Start(AgentId agentId, DateTimeOffset at) =>
        new Session(agentId, at, at);

    public Session(AgentId agentId, DateTimeOffset started, DateTimeOffset lastActivity)
    {
        AgentId = agentId;
        Started = started;
        LastActivity = lastActivity;
    }

    public AgentId AgentId { get; }
    public DateTimeOffset Started { get; }
    public DateTimeOffset LastActivity { get; }
    public bool IsExpired(DateTimeOffset at) => LastActivity + Ttl < at;

    public bool TryExtend(DateTimeOffset till, out Session extended)
    {
        extended = this;
        if (IsExpired(till))
            return false;

        extended = new Session(AgentId, Started, till);
        return true;
    }

    protected override IEnumerable<object> EqualityCheckAttributes =>
        new object[] { AgentId, Started };
}
class PingMonitor : IPingMonitor
{
    private static readonly TimeSpan Interval = TimeSpan.FromMinutes(1);
    AsyncDictionary<AgentId, Session> Sessions { get; } =
        new AsyncDictionary<AgentId, Session>();

    public PingMonitor(IClock clock, ISessionMonitor monitor)
    {
        Clock = clock;
        Monitor = monitor;
        CancellationTokenSource = new CancellationTokenSource();
        Task.Run(RunAsync);
    }

    public void Dispose()
    {
        CancellationTokenSource.Cancel();
        Monitor.Dispose();
    }

    IClock Clock { get; }
    ISessionMonitor Monitor { get; }
    CancellationTokenSource CancellationTokenSource { get; }

    public Task PingAsync(AgentId agentId) =>
        StartOrExtendAsync(agentId);

    async Task StartOrExtendAsync(AgentId agentId)
    {
        var result = await Sessions.AddOrUpdateAsync(
            agentId,
            () => Session.Start(agentId, Clock.GetTime()),
            (ref Session s) => s.TryExtend(Clock.GetTime(), out s));

        switch (result.Action)
        {
            case ValueAction.Added:
                await Monitor.StartAsync(result.Value);
                return;

            case ValueAction.Updated:
                return;

            case ValueAction.Cancelled:
                await EndAsync(agentId);
                await StartOrExtendAsync(agentId);
                return;

            default:
                throw new NotImplementedException();
        }
    }

    async Task EndAsync(AgentId agentId)
    {
        var result = Sessions.Remove(agentId, s => s.IsExpired(Clock.GetTime()));
        switch (result.Action)
        {
            case ValueAction.Removed:
                await Monitor.EndAsync(result.Value);
                return;

            case ValueAction.Cancelled:
                return;

            default:
                throw new NotImplementedException();
        }
    }

    async Task RunAsync()
    {
        while (!CancellationTokenSource.Token.IsCancellationRequested)
        {
            await Task.Delay(Interval, CancellationTokenSource.Token);
            await Task.WhenAll(from r in Sessions.Remove(s => s.IsExpired(Clock.GetTime()))
                               select Monitor.EndAsync(r.Value));
        }
    }
}

Context

StackExchange Code Review Q#129689, answer score: 3

Revisions (0)

No revisions yet.