HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Producer / Consumer implementation for Parallel Processing in the TPL

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
processingtheconsumerforparallelimplementationproducertpl

Problem

I've stripped this down to its bare bones removing stopping/cancellation logic etc... to keep it simple.

The Producer is a very simple class containing a timer. At regular intervals the TimerOnElapsed will let the host now that it has another batch of items available. The onus is on the host to pull that next batch using GetNextBatch().

public class Producer
{
    public event EventHandler BatchAvailable; 
    private readonly Timer timer;
    private int i;

    public Producer()
    {
        i = 1;
        timer = new Timer(5000);
        timer.Elapsed += TimerOnElapsed;
    }

    public void Start()
    {
        timer.Enabled = true;
        timer.Start();
    }

    private void TimerOnElapsed(object sender, ElapsedEventArgs e)
    {
        if (BatchAvailable != null)
            BatchAvailable(sender , e);
    }

    public IEnumerable GetNextBatch()
    {
        var range = Enumerable.Range(i, i + 50).ToList();
        i = i + 50;
        return range;
    }
}


The Consumer class uses a Concurrent.BlockingCollection to pass objects to a Parallel ForEach loop. The Consumer is READY for the next batch each time it empties the blocking collection of the existing batch.

```
public class Consumer
{
private TaskFactory _factory;
private readonly BlockingCollection _entries;

public Consumer()
{
_entries = new BlockingCollection();
}

public void Start()
{

_factory = new TaskFactory();

try
{
_factory.StartNew(() =>
{
Parallel.ForEach(
_entries.GetConsumingEnumerable(),
new ParallelOptions() { MaxDegreeOfParallelism = 5 },
ProcessEntry
);
});
}
catch (OperationCanceledException oce) { }
}

public void Add(int entry)
{
_entries.Add(entry);
}

public bool Ready
{
get { return (_entries.Count ==

Solution

As a matter of being thread-safe, you should replace this:

private void TimerOnElapsed(object sender, ElapsedEventArgs e)
    {
        if (BatchAvailable != null)
            BatchAvailable(sender , e);
    }


with this:

private void TimerOnElapsed(object sender, ElapsedEventArgs e)
    {
        var batchAvailable = this.BatchAvailable;

        if (batchAvailable != null)
        {
            batchAvailable(sender, e);
        }
    }


Reason being, accessing the event field "raw", you may wind up with it going null between the if and the invocation itself.

Code Snippets

private void TimerOnElapsed(object sender, ElapsedEventArgs e)
    {
        if (BatchAvailable != null)
            BatchAvailable(sender , e);
    }
private void TimerOnElapsed(object sender, ElapsedEventArgs e)
    {
        var batchAvailable = this.BatchAvailable;

        if (batchAvailable != null)
        {
            batchAvailable(sender, e);
        }
    }

Context

StackExchange Code Review Q#8972, answer score: 4

Revisions (0)

No revisions yet.