HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Queue with multiple consumers and one producer

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
withonemultipleandproducerqueueconsumers

Problem

I've written a queue supporting one producer and multiple consumer threads. The idea is that the queue instances a definable number of long running consumer threads. Internally I'm using a BlockingCollection to solve the producer consumer problem. I've done some little testing via a console application, and it seems to work. Can somebody review the code and let me know if there is any flaw?

The code can be found under Github.

Example usage:

static void Main(string[] args)
{
    var q = new QueueWithMultipleConsumerThreads(
            numberOfWorkerThreads: 10,
            actionToBeCalled: i =>
            {
                Console.WriteLine($"Consumed {i} from thread {Thread.CurrentThread.Name}, id: {Thread.CurrentThread.ManagedThreadId}");
            });

    // Add some entries to the q
    for (int i = 0; i < 10000; i++)
    {
        q.Enque(i);
    }

    Thread.Sleep(5000); // Give the q time to work
    q.Shutdown();
}


QueueWithMultipleConsumerThreads:

```
public class QueueWithMultipleConsumerThreads
{
private readonly ConcurrentBag threads = new ConcurrentBag();
private readonly ConcurrentBag> workers = new ConcurrentBag>();
private readonly BlockingCollection queue = new BlockingCollection();

public QueueWithMultipleConsumerThreads(uint numberOfWorkerThreads, Action actionToBeCalled )
{
if (numberOfWorkerThreads == 0) { throw new ArgumentException($"{nameof(numberOfWorkerThreads)} must be > 0"); }
if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}

for (var i = 0; i (this.queue, threadName, actionToBeCalled, logger);
var t = new Thread(w.DoWork) { IsBackground = true, Name = threadName};

this.workers.Add(w);
this.threads.Add(t);
t.Start();
}
}

public void Enque(T item)
{
this.queue.Add(item);
}

public int Count()
{
return this.queue.Count;
}

publ

Solution

You shouldn't really create a CancellationTokenSource inside your workers. Instead you should create single instance inside your queue and pass the same token to all workers. Then you can use cancelationTokenSource.Cancel() inside Shutdown method to send a cancellation signal to every worker. Also, instead of using shouldStop, you should use token.IsCancellationRequested flag.

It is the other way around with worker threads. Those should probably be created by workers themselves, so you do not have to store them separately. Good job on actually Joining the threads you create, that's a good thing to do. But the timeout smells. Are you sure that you are not hiding an error? If threads successfully join every time, then you should call regular Join() without timeout, if they do not - that is something you should investigate and fix.

Finally, you should pay attention to classes that implement IDisposable. Nothing horrible will probably happen if you forget to dispose them, but still it is a good idea to clean things up yourself when you are done using your CancellationTokenSources or your BlockingCollections.

Context

StackExchange Code Review Q#156378, answer score: 3

Revisions (0)

No revisions yet.