patterncsharpMinor
Consuming chunks from ConcurrentQueue
Viewed 0 times
fromchunksconcurrentqueueconsuming
Problem
I need to implement a queue of requests which can be populated from multiple threads. When this queue becomes larger than 1000 completed requests, these requests should be stored into a database.
Is there any way to implement this without lock and
public class RequestQueue
{
private static BlockingCollection _queue = new BlockingCollection();
private static ConcurrentQueue _storageQueue = new ConcurrentQueue();
private static volatile bool isLoading = false;
private static object _lock = new object();
public static void Launch()
{
Task.Factory.StartNew(execute);
}
public static void Add(VerificationRequest request)
{
_queue.Add(request);
}
public static void AddRange(List requests)
{
Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
(request) => { _queue.Add(request); });
}
private static void execute()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest );
}
private static void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue( new RequestExecuter().ExecuteVerificationRequest( request ) );
if (_storageQueue.Count > 1000 && !isLoading)
{
lock ( _lock )
{
if ( _storageQueue.Count > 1000 && !isLoading )
{
isLoading = true;
var requestChunck = new List();
VerificationRequest req;
for (var i = 0; i < 1000; i++)
{
if( _storageQueue.TryDequeue(out req))
requestChunck.Add(req);
}
new VerificationRequestRepository().InsertRange(requestChunck);
isLoading = false;
}
}
}
}
}Is there any way to implement this without lock and
isLoading?Solution
Static class
As proposed by eurotrash I would not implement the class as static class because it has some disadvantages:
Naming
Logic
Is there any way to implement this without lock and isLoading?
Currently your solution uses up to 5 tasks that process the items from the
If the last part (write the items to the repository) is executed by a separate single task, the lock and the isLoading flag could be dropped.
The lock and the isLoding flag can be dropped if the last part (writing the items to the repository)
Tasks
-
If you have long running tasks (like the one in the method
-
-
Code with the suggestions above applied:
As proposed by eurotrash I would not implement the class as static class because it has some disadvantages:
- It is only possible to use one class per application
- It is not possible to mock the class (e.g. for unit test / alternative implementations)
- It is not possible to use it with DI.
- It is not very common to have a statefull static classes - therefore it may confuse other developer
Naming
isLoadingshould be renamed to_isLoading
executeshould be renamed toExecuteor even better to some more descriptive name (e.g.StartConsumingTask)
Logic
Is there any way to implement this without lock and isLoading?
Currently your solution uses up to 5 tasks that process the items from the
_queue and put them into the _storageQueue. If the _storageQueue has more than 1000 items, one of the 5 tasks writes them to the repository.If the last part (write the items to the repository) is executed by a separate single task, the lock and the isLoading flag could be dropped.
The lock and the isLoding flag can be dropped if the last part (writing the items to the repository)
Tasks
-
If you have long running tasks (like the one in the method
Launch), it is better to start it with option TaskCreationOptions.LongRunning. Otherwise the thread for the task is taken from the threadpool and therefore blocks them for executing short-running procedures.-
_queue.Add (in method AddRange) is such a fast operation. I don't see a reason why to use Parallel.For here - just add the item in the current thread.-
Parallel.For in method execute makes sense if ExecuteVerificationRequest is realy time consuming - otherwise it adds only overhead and complexity.Code with the suggestions above applied:
public class RequestQueue
{
private BlockingCollection _queue = new BlockingCollection();
private ConcurrentQueue _storageQueue = new ConcurrentQueue();
private RequestQueue() { }
public void Launch()
{
Task.Factory.StartNew(StartConsumingTask);
Task.Factory.StartNew(StartPersistingTask);
}
public void Add(VerificationRequest request) => _queue.Add(request);
public void AddRange(List requests) => requests.ForEach(r => Add(r));
private void StartConsumingTask()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, EnqueueSaveRequest);
}
private void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue(new RequestExecuter().ExecuteVerificationRequest(request));
}
private async Task StartPersistingTask()
{
while (true)
{
if (_storageQueue.Count > 1000)
{
var requestChunck = new List();
VerificationRequest req;
for (var i = 0; i _instance;
}Code Snippets
public class RequestQueue
{
private BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private RequestQueue() { }
public void Launch()
{
Task.Factory.StartNew(StartConsumingTask);
Task.Factory.StartNew(StartPersistingTask);
}
public void Add(VerificationRequest request) => _queue.Add(request);
public void AddRange(List<VerificationRequest> requests) => requests.ForEach(r => Add(r));
private void StartConsumingTask()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, EnqueueSaveRequest);
}
private void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue(new RequestExecuter().ExecuteVerificationRequest(request));
}
private async Task StartPersistingTask()
{
while (true)
{
if (_storageQueue.Count > 1000)
{
var requestChunck = new List<VerificationRequest>();
VerificationRequest req;
for (var i = 0; i < 1000; i++)
{
if (_storageQueue.TryDequeue(out req))
requestChunck.Add(req);
}
new VerificationRequestRepository().InsertRange(requestChunck);
}
else
{
await Task.Delay(100);
}
}
}
private static RequestQueue _instance = new RequestQueue();
public static RequestQueue Instance => _instance;
}Context
StackExchange Code Review Q#143010, answer score: 4
Revisions (0)
No revisions yet.