patterncppMinor
General multithreaded file processing
Viewed 0 times
processingfilemultithreadedgeneral
Problem
I'm trying to write a general multithreaded file processing facility. The idea is that some input file consists of a number of discrete records, each record needs to be processed in the same manner, and be written back out to another file. I would like to multithread this. I've tried to make this as simple as possible, so all the user needs to define is:
I have a thread safe queue (taken from C++ Concurrency in action). My basic idea is to use two of these queues, one to buffer the records read by a 'reader' thread, and one to buffer the processed records produced by a number of 'worker' threads, which is then written to a file by a single 'writer' thread. Note this solution does not assume the record order will be maintained.
One problem is how to notify the worker and writer threads of the last record. I've solved this by using a wrapper class around the users input type. It's not pretty, but it works. Any other suggestions are welcome.
I'm looking for any runtime performance optimisations, and general design improvements.
```
#include
#include
#include
#include "threadsafe_queue.h"
template
struct QueueItem
{
bool is_sentinel;
T item;
QueueItem() {}
QueueItem(bool is_sentinel_) : is_sentinel(is_sentinel_) {}
QueueItem(bool is_sentinel_, T item_) : is_sentinel(is_sentinel_), item(item_) {}
};
template
void reader(threadsafe_queue>& queue, const std::string& filename, FGet get_item, unsigned num_worker_threads)
{
std::ifstream in_file(filename, std::ios::in | std::ios::binary);
if (in_file) {
TIn item;
bool is_finished = false;
while (!is_finished) {
is_finished = get_item(in_file, item);
queue.push(QueueItem(false, item));
}
for (; num_worker_threads > 0; --num_worker_threads) {
queue.push(QueueItem(true));
}
- How to get the next record from the input file.
- How to process each record.
- How to write a processed record to the output file.
I have a thread safe queue (taken from C++ Concurrency in action). My basic idea is to use two of these queues, one to buffer the records read by a 'reader' thread, and one to buffer the processed records produced by a number of 'worker' threads, which is then written to a file by a single 'writer' thread. Note this solution does not assume the record order will be maintained.
One problem is how to notify the worker and writer threads of the last record. I've solved this by using a wrapper class around the users input type. It's not pretty, but it works. Any other suggestions are welcome.
I'm looking for any runtime performance optimisations, and general design improvements.
```
#include
#include
#include
#include "threadsafe_queue.h"
template
struct QueueItem
{
bool is_sentinel;
T item;
QueueItem() {}
QueueItem(bool is_sentinel_) : is_sentinel(is_sentinel_) {}
QueueItem(bool is_sentinel_, T item_) : is_sentinel(is_sentinel_), item(item_) {}
};
template
void reader(threadsafe_queue>& queue, const std::string& filename, FGet get_item, unsigned num_worker_threads)
{
std::ifstream in_file(filename, std::ios::in | std::ios::binary);
if (in_file) {
TIn item;
bool is_finished = false;
while (!is_finished) {
is_finished = get_item(in_file, item);
queue.push(QueueItem(false, item));
}
for (; num_worker_threads > 0; --num_worker_threads) {
queue.push(QueueItem(true));
}
Solution
A potential disaster waiting to happen.
because
we don't know that the filesystem hasn't given us some garbage which is outside the range 0-84.
Use
This brings us to the next point: where do you handle the
Regarding performance, the reader and writer should insure that the files have a large buffer associated with them as it improves disk performance. More exotic techniques of reading a block and doing your own search in it can be considered as the repeated calls to
Is the queue full C++11? If so, you should use
Which brings us to
The thread safe queue may make too many copies and assigns, and too few moves. The same with
Process file and friends nearly screams to be made into a class; it is nearly there.
static char complements[85];because
c = complements[record.second[i]];we don't know that the filesystem hasn't given us some garbage which is outside the range 0-84.
Use
std::array for a nice throw if it fails as a basic error handling:static std::array complements;This brings us to the next point: where do you handle the
throws? Is it safe that the files are not checked for different names?Regarding performance, the reader and writer should insure that the files have a large buffer associated with them as it improves disk performance. More exotic techniques of reading a block and doing your own search in it can be considered as the repeated calls to
std::getline() might not be optimal.queue.push(QueueItem(false, item));Is the queue full C++11? If so, you should use
std::emplace_back() to construct the object directly in the queue, saving some copying.Which brings us to
QueueItem:template
struct QueueItemThe thread safe queue may make too many copies and assigns, and too few moves. The same with
QueueItem; move is often better than copy construct or assign.Process file and friends nearly screams to be made into a class; it is nearly there.
Code Snippets
static char complements[85];c = complements[record.second[i]];static std::array<char, 85> complements;queue.push(QueueItem<TIn>(false, item));template<typename T>
struct QueueItemContext
StackExchange Code Review Q#62126, answer score: 4
Revisions (0)
No revisions yet.