HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

TPL Dataflow BatchBlock with Batch Size & Timeout

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
withsizedataflowbatchbatchblocktimeouttpl

Problem

I needed BatchBlock with support for timeout (act greedy on what it received at that time) and by some search and study, this is the outcome.

What are drawbacks or rooms for improvement in this code?

class BatchedQueue
{
    static readonly Logger SlotClassLogger = LogManager.GetCurrentClassLogger();
    protected static Logger ClassLogger { get { return SlotClassLogger; } }
}

class BatchedQueue : BatchedQueue
{
    readonly int _interval;
    readonly TimeSpan _loopWait;
    readonly BatchBlock _queue;
    readonly Timer _timer;
    readonly BufferBlock _source;

    public event Action Received;

    protected virtual void OnReceived(T[] obj)
    {
        var handler = Received;
        if (handler != null) handler(obj);
    }

    public BatchedQueue(int interval, int size)
    {
        _interval = interval;

        _loopWait = TimeSpan.FromMilliseconds(2.19 * (1.3 + _interval)); //study

        _queue = new BatchBlock(size, new GroupingDataflowBlockOptions { Greedy = true });
        _timer = new Timer(_ => _queue.TriggerBatch());

        var transformer = new TransformBlock(v =>
        {
            _timer.Change(_interval, Timeout.Infinite);
            return v;
        });
        transformer.LinkTo(_queue);
        _source = new BufferBlock();
        _source.LinkTo(transformer);

        Utl.SafeTask(Loop, t =>
        {
            if (t == null || t.Exception == null) return;
            ClassLogger.Error(t.Exception);
        }, TaskCreationOptions.LongRunning);
    }

    public virtual void Post(T t) { _source.Post(t); }
    void Loop()
    {
        while (true)
        {
            var buffer = _queue.Receive(_loopWait);
            if (buffer != null) { OnReceived(buffer); }
        }
    }
}

Solution

class BatchedQueue


What's the reason for having this separate class? Is it so that there is only one instance of SlotClassLogger? If that's the case, that sounds like premature optimization to me.

class BatchedQueue : BatchedQueue


You're not implementing (or exposing in other way) any of the dataflow interfaces. That means this class can't easily be used as a part of a dataflow network.

readonly int _interval;


When you have a value that is of some unit of measure, you should always very clearly specify what unit is uses. (You don't want your Mars probe to crash, right?) You can do that either in the name of variable or in a comment.

Though it seems you're not using any common unit. That should also be document very clearly and it should also have a very good reason. If the unit is something specific to your domain, then I don't think it belongs here, a class like this should be reusable.

If the condition for triggering it not actually some time elapsing, then consider using that condition directly, instead of approximating it using a timer.

transformer.LinkTo(_queue);
_source = new BufferBlock();
_source.LinkTo(transformer);


I don't see any reason for the _source buffer block here. You could send items directly into transformer.

while (true)
{
    var buffer = _queue.Receive(_loopWait);
    if (buffer != null) { OnReceived(buffer); }
}


This code is blocking a thread unnecessarily when there is no work to do. The simplest solution would be to use ActionBlock to execute OnReceived.

There is no way to tell the queue to stop processing or for your to wait until its processing is done. Consider adding the Complete()/Completed pair used in dataflow blocks.

When dealing with time, Rx is often better than Dataflow. A behavior similar to yours could be achived by using Buffer():

_source = new Subject();
_source.Buffer(_loopWait, size)
       .Subscribe(OnReceived);


Though this measures the timeout since the last batch, not since the last element. I think you could achieve that using something like (based on this SO answer):

_source.Buffer(() => 
    Observable.Merge(_source.Throttle(_loopWait).Take(1), _source.Skip(size - 1).Take(1)))
    .Subscribe(OnReceived);

Code Snippets

class BatchedQueue
class BatchedQueue<T> : BatchedQueue
readonly int _interval;
transformer.LinkTo(_queue);
_source = new BufferBlock<T>();
_source.LinkTo(transformer);
while (true)
{
    var buffer = _queue.Receive(_loopWait);
    if (buffer != null) { OnReceived(buffer); }
}

Context

StackExchange Code Review Q#65301, answer score: 3

Revisions (0)

No revisions yet.