patterncsharpMinor
Producer / Consumer implementation for Parallel Processing in the TPL
Viewed 0 times
processingtheconsumerforparallelimplementationproducertpl
Problem
I've stripped this down to its bare bones removing stopping/cancellation logic etc... to keep it simple.
The
The
```
public class Consumer
{
private TaskFactory _factory;
private readonly BlockingCollection _entries;
public Consumer()
{
_entries = new BlockingCollection();
}
public void Start()
{
_factory = new TaskFactory();
try
{
_factory.StartNew(() =>
{
Parallel.ForEach(
_entries.GetConsumingEnumerable(),
new ParallelOptions() { MaxDegreeOfParallelism = 5 },
ProcessEntry
);
});
}
catch (OperationCanceledException oce) { }
}
public void Add(int entry)
{
_entries.Add(entry);
}
public bool Ready
{
get { return (_entries.Count ==
The
Producer is a very simple class containing a timer. At regular intervals the TimerOnElapsed will let the host now that it has another batch of items available. The onus is on the host to pull that next batch using GetNextBatch().public class Producer
{
public event EventHandler BatchAvailable;
private readonly Timer timer;
private int i;
public Producer()
{
i = 1;
timer = new Timer(5000);
timer.Elapsed += TimerOnElapsed;
}
public void Start()
{
timer.Enabled = true;
timer.Start();
}
private void TimerOnElapsed(object sender, ElapsedEventArgs e)
{
if (BatchAvailable != null)
BatchAvailable(sender , e);
}
public IEnumerable GetNextBatch()
{
var range = Enumerable.Range(i, i + 50).ToList();
i = i + 50;
return range;
}
}The
Consumer class uses a Concurrent.BlockingCollection to pass objects to a Parallel ForEach loop. The Consumer is READY for the next batch each time it empties the blocking collection of the existing batch.```
public class Consumer
{
private TaskFactory _factory;
private readonly BlockingCollection _entries;
public Consumer()
{
_entries = new BlockingCollection();
}
public void Start()
{
_factory = new TaskFactory();
try
{
_factory.StartNew(() =>
{
Parallel.ForEach(
_entries.GetConsumingEnumerable(),
new ParallelOptions() { MaxDegreeOfParallelism = 5 },
ProcessEntry
);
});
}
catch (OperationCanceledException oce) { }
}
public void Add(int entry)
{
_entries.Add(entry);
}
public bool Ready
{
get { return (_entries.Count ==
Solution
As a matter of being thread-safe, you should replace this:
with this:
Reason being, accessing the event field "raw", you may wind up with it going
private void TimerOnElapsed(object sender, ElapsedEventArgs e)
{
if (BatchAvailable != null)
BatchAvailable(sender , e);
}with this:
private void TimerOnElapsed(object sender, ElapsedEventArgs e)
{
var batchAvailable = this.BatchAvailable;
if (batchAvailable != null)
{
batchAvailable(sender, e);
}
}Reason being, accessing the event field "raw", you may wind up with it going
null between the if and the invocation itself.Code Snippets
private void TimerOnElapsed(object sender, ElapsedEventArgs e)
{
if (BatchAvailable != null)
BatchAvailable(sender , e);
}private void TimerOnElapsed(object sender, ElapsedEventArgs e)
{
var batchAvailable = this.BatchAvailable;
if (batchAvailable != null)
{
batchAvailable(sender, e);
}
}Context
StackExchange Code Review Q#8972, answer score: 4
Revisions (0)
No revisions yet.