HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Performing parallel processing on a file

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
performingprocessingfileparallel

Problem

I have some code that reads a file and then does some parallel processing of the data. There are millions of lines in the file and this section of the code is the bottleneck in my program. Any information on how to improve processing times or any other suggestions to improve the code (e.g. error processing, string manipulation, or anything to increase speed) is appreciated. I'm new to parallel processing in .NET.

```
// Read in the source and target file and start solving the strongest path problem.
try
{
ConcurrentBag strongestPaths = new ConcurrentBag();
String[] allFileLines = null;
int maxSize = (int)Math.Floor((double)(Int32.MaxValue / 10000));

// Allocate memory for the source and target file array
allFileLines = new String[maxSize];

using (StreamReader sr = File.OpenText(sourceTargetArg))
{
// Find the strongest path for each of the target nodes
int x = 0;
while (!sr.EndOfStream)
{
allFileLines[x] = sr.ReadLine();
x++;

if (x == maxSize || sr.EndOfStream)
{
Parallel.For(0, allFileLines.Length, (i, loopState) =>
{
if (allFileLines[i] != null)
{
try
{
Node targetNode = getTargetNode(graph, allFileLines[i]);

// If the Target Node was not found, do not process this node, and continue
if (targetNode != null)
{
var path = calculator.GetPath(targetNode);
String targetPath = String.Empty;

Solution

I saw that you anyway read till end of file, so I think (you need to measure it) that its will be faster if you'll read it all (line by line as you do) and then parallel the work once and avoid construct the parallelism for every chunk.

Also if you use Parallel.Foreach, you can avoid the null checking for allFileLines[i].

Consider to use a custom partitioner. You must measure it but take in mind that sometimes it better to have a large amount of data with less loops where a lot of loops with small chunk of data.

About the previous comment, if your inside work is short, a partitioner is your way to get a better performance.

Again, you need measure it but it might be faster if you collect the strongestPaths in local list (lock free) and then aggregate them into global list with lock when each work is complete.

For this you need to use this overload:

Parallel.ForEach(
    IEnumerable source,
    Func localInit,
    Func body,
    Action localFinally);


ArrayClear in each loop work - is also can avoided if you use one big chunk. It's not a time consuming but still its need to go over thousands of items and set them to null.

ArrayClear in finally block - in principle, if you set the array to null, the GC will know that all his items are dead, so it redundant to do the clear. I don't now if you decide to do the clear after measuring it. if yes, ignore this comment.

About exceptions, it may be useless for you, but it worth to mention that you can aggregate them inside the loop and decide what to do after the loop is complete. Of course it cost in performance if a lot exception has occured (because the thread safety of the ConcurrentQueue).

var exceptions = new ConcurrentQueue();

    Parallel.ForEach(data, _ =>
    {
        try { throw new Exception(); }                   
        catch (Exception e) { exceptions.Enqueue(e); }
    });

    if (exceptions.Count > 0)
        // handle..


In anyway you need to measure every move because in this kind of work the speed depends on your loop work and in your current hardware.

For further reading look in this series

And you can find a file reading benchmark here

About the getTargetNode method,
First, method names need to be PascalCasing
Second, not so important but take a look on some changes I made:

public static Node GetTargetNode(Graph graph, string targetLine)
{
    if (string.IsNullOrEMpty(targetLine))
        throw new ArgumentNullException(nameof(targetLine));

    try
    {
        // Verify that the target node is a node in the graph
        return graph.GetNode(targetLine);
    }
    catch (KeyNotFoundException e)
    {
        throw new KeyNotFoundException("Invalid Input: The Target Node, " + targetLine.Trim() + ", in the Source and Target file is not a node in the graph. ", e);
    }
    catch (Exception e)
    {
        throw new Exception("Invalid Input: The Target Node, " + targetLine.Trim() + ", in the Source and Target file is invalid: " + e.Message, e);
    }
}


UPDATE

I'm adding example for partitioner and local finally usage

private static long ParallelPartitionerWithLocal(long from, long to)
{
    long result = 0;
    var partitioner = Partitioner.Create(from, to, 
               (to - from)/(Environment.ProcessorCount));

    Parallel.ForEach(partitioner, () => 0L /*local init*/, 
               (range, loopState, subTotal) /*body*/ =>
               {
                   for (var l = range.Item1; l  Interlocked.Add(ref result, subTotal));

    return result;
}


Because result is global, we can't to change it inside the loop without lock, so we change a local var and in the end change the result just once.

The partitioner here is simple and based on the range and the numbers of cores.

Code Snippets

Parallel.ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal, TLocal> body,
    Action<TLocal> localFinally);
var exceptions = new ConcurrentQueue<Exception>();

    Parallel.ForEach(data, _ =>
    {
        try { throw new Exception(); }                   
        catch (Exception e) { exceptions.Enqueue(e); }
    });

    if (exceptions.Count > 0)
        // handle..
public static Node GetTargetNode(Graph graph, string targetLine)
{
    if (string.IsNullOrEMpty(targetLine))
        throw new ArgumentNullException(nameof(targetLine));

    try
    {
        // Verify that the target node is a node in the graph
        return graph.GetNode(targetLine);
    }
    catch (KeyNotFoundException e)
    {
        throw new KeyNotFoundException("Invalid Input: The Target Node, " + targetLine.Trim() + ", in the Source and Target file is not a node in the graph. ", e);
    }
    catch (Exception e)
    {
        throw new Exception("Invalid Input: The Target Node, " + targetLine.Trim() + ", in the Source and Target file is invalid: " + e.Message, e);
    }
}
private static long ParallelPartitionerWithLocal(long from, long to)
{
    long result = 0;
    var partitioner = Partitioner.Create(from, to, 
               (to - from)/(Environment.ProcessorCount));

    Parallel.ForEach(partitioner, () => 0L /*local init*/, 
               (range, loopState, subTotal) /*body*/ =>
               {
                   for (var l = range.Item1; l < range.Item2; ++l)
                        subTotal += l;
                   return subTotal;
               }, 
               subTotal /*local finally*/ => Interlocked.Add(ref result, subTotal));

    return result;
}

Context

StackExchange Code Review Q#124038, answer score: 2

Revisions (0)

No revisions yet.