patterncsharpMinor
TPL Dataflow BatchBlock with Batch Size & Timeout
Viewed 0 times
withsizedataflowbatchbatchblocktimeouttpl
Problem
I needed
What are drawbacks or rooms for improvement in this code?
BatchBlock with support for timeout (act greedy on what it received at that time) and by some search and study, this is the outcome.What are drawbacks or rooms for improvement in this code?
class BatchedQueue
{
static readonly Logger SlotClassLogger = LogManager.GetCurrentClassLogger();
protected static Logger ClassLogger { get { return SlotClassLogger; } }
}
class BatchedQueue : BatchedQueue
{
readonly int _interval;
readonly TimeSpan _loopWait;
readonly BatchBlock _queue;
readonly Timer _timer;
readonly BufferBlock _source;
public event Action Received;
protected virtual void OnReceived(T[] obj)
{
var handler = Received;
if (handler != null) handler(obj);
}
public BatchedQueue(int interval, int size)
{
_interval = interval;
_loopWait = TimeSpan.FromMilliseconds(2.19 * (1.3 + _interval)); //study
_queue = new BatchBlock(size, new GroupingDataflowBlockOptions { Greedy = true });
_timer = new Timer(_ => _queue.TriggerBatch());
var transformer = new TransformBlock(v =>
{
_timer.Change(_interval, Timeout.Infinite);
return v;
});
transformer.LinkTo(_queue);
_source = new BufferBlock();
_source.LinkTo(transformer);
Utl.SafeTask(Loop, t =>
{
if (t == null || t.Exception == null) return;
ClassLogger.Error(t.Exception);
}, TaskCreationOptions.LongRunning);
}
public virtual void Post(T t) { _source.Post(t); }
void Loop()
{
while (true)
{
var buffer = _queue.Receive(_loopWait);
if (buffer != null) { OnReceived(buffer); }
}
}
}Solution
class BatchedQueueWhat's the reason for having this separate class? Is it so that there is only one instance of
SlotClassLogger? If that's the case, that sounds like premature optimization to me.class BatchedQueue : BatchedQueueYou're not implementing (or exposing in other way) any of the dataflow interfaces. That means this class can't easily be used as a part of a dataflow network.
readonly int _interval;When you have a value that is of some unit of measure, you should always very clearly specify what unit is uses. (You don't want your Mars probe to crash, right?) You can do that either in the name of variable or in a comment.
Though it seems you're not using any common unit. That should also be document very clearly and it should also have a very good reason. If the unit is something specific to your domain, then I don't think it belongs here, a class like this should be reusable.
If the condition for triggering it not actually some time elapsing, then consider using that condition directly, instead of approximating it using a timer.
transformer.LinkTo(_queue);
_source = new BufferBlock();
_source.LinkTo(transformer);I don't see any reason for the
_source buffer block here. You could send items directly into transformer.while (true)
{
var buffer = _queue.Receive(_loopWait);
if (buffer != null) { OnReceived(buffer); }
}This code is blocking a thread unnecessarily when there is no work to do. The simplest solution would be to use
ActionBlock to execute OnReceived.There is no way to tell the queue to stop processing or for your to wait until its processing is done. Consider adding the
Complete()/Completed pair used in dataflow blocks.When dealing with time, Rx is often better than Dataflow. A behavior similar to yours could be achived by using
Buffer():_source = new Subject();
_source.Buffer(_loopWait, size)
.Subscribe(OnReceived);Though this measures the timeout since the last batch, not since the last element. I think you could achieve that using something like (based on this SO answer):
_source.Buffer(() =>
Observable.Merge(_source.Throttle(_loopWait).Take(1), _source.Skip(size - 1).Take(1)))
.Subscribe(OnReceived);Code Snippets
class BatchedQueueclass BatchedQueue<T> : BatchedQueuereadonly int _interval;transformer.LinkTo(_queue);
_source = new BufferBlock<T>();
_source.LinkTo(transformer);while (true)
{
var buffer = _queue.Receive(_loopWait);
if (buffer != null) { OnReceived(buffer); }
}Context
StackExchange Code Review Q#65301, answer score: 3
Revisions (0)
No revisions yet.