HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

General multithreaded file processing

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
processingfilemultithreadedgeneral

Problem

I'm trying to write a general multithreaded file processing facility. The idea is that some input file consists of a number of discrete records, each record needs to be processed in the same manner, and be written back out to another file. I would like to multithread this. I've tried to make this as simple as possible, so all the user needs to define is:

  • How to get the next record from the input file.



  • How to process each record.



  • How to write a processed record to the output file.



I have a thread safe queue (taken from C++ Concurrency in action). My basic idea is to use two of these queues, one to buffer the records read by a 'reader' thread, and one to buffer the processed records produced by a number of 'worker' threads, which is then written to a file by a single 'writer' thread. Note this solution does not assume the record order will be maintained.

One problem is how to notify the worker and writer threads of the last record. I've solved this by using a wrapper class around the users input type. It's not pretty, but it works. Any other suggestions are welcome.

I'm looking for any runtime performance optimisations, and general design improvements.

```
#include
#include
#include
#include "threadsafe_queue.h"

template
struct QueueItem
{
bool is_sentinel;
T item;
QueueItem() {}
QueueItem(bool is_sentinel_) : is_sentinel(is_sentinel_) {}
QueueItem(bool is_sentinel_, T item_) : is_sentinel(is_sentinel_), item(item_) {}
};

template
void reader(threadsafe_queue>& queue, const std::string& filename, FGet get_item, unsigned num_worker_threads)
{
std::ifstream in_file(filename, std::ios::in | std::ios::binary);
if (in_file) {
TIn item;
bool is_finished = false;
while (!is_finished) {
is_finished = get_item(in_file, item);
queue.push(QueueItem(false, item));
}
for (; num_worker_threads > 0; --num_worker_threads) {
queue.push(QueueItem(true));
}

Solution

A potential disaster waiting to happen.

static char complements[85];


because

c = complements[record.second[i]];


we don't know that the filesystem hasn't given us some garbage which is outside the range 0-84.

Use std::array for a nice throw if it fails as a basic error handling:

static std::array complements;


This brings us to the next point: where do you handle the throws? Is it safe that the files are not checked for different names?

Regarding performance, the reader and writer should insure that the files have a large buffer associated with them as it improves disk performance. More exotic techniques of reading a block and doing your own search in it can be considered as the repeated calls to std::getline() might not be optimal.

queue.push(QueueItem(false, item));


Is the queue full C++11? If so, you should use std::emplace_back() to construct the object directly in the queue, saving some copying.

Which brings us to QueueItem:

template
struct QueueItem


The thread safe queue may make too many copies and assigns, and too few moves. The same with QueueItem; move is often better than copy construct or assign.

Process file and friends nearly screams to be made into a class; it is nearly there.

Code Snippets

static char complements[85];
c = complements[record.second[i]];
static std::array<char, 85> complements;
queue.push(QueueItem<TIn>(false, item));
template<typename T>
struct QueueItem

Context

StackExchange Code Review Q#62126, answer score: 4

Revisions (0)

No revisions yet.