HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Rewriting to single pipeline by using data flow blocks

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
blocksflowsingleusingrewritingdatapipeline

Problem

I have a number of messages with a payload as a list of items. My goal is to transform the message payload without losing the relation between the message and payloads.

Isn't it possible to rewrite the code to a single pipeline without creating an instance of a nested pipeline for each message?

```
using System.Threading.Tasks.Dataflow;

public class Message
{
public int Id { get; set; }

public IList Items { get; set; }
}

class Program
{
static void Main(string[] args)
{
var rnd = new Random();

var input = new TransformBlock, Message>(x => Transform(x));

var action = new ActionBlock>(
x =>
{
Console.WriteLine("MSG #{0}", x.Id);

foreach (var item in x.Items)
{
Console.WriteLine("\t {0}", item);
}
});

input.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true });

for (var idx = 1; idx { Id = idx, Items = Enumerable.Range(0, rnd.Next(0, 10)).ToList() };
input.Post(msg);
}

input.Complete();
action.Completion.Wait();

Console.WriteLine("Done");
Console.ReadKey();
}

private static async Task> Transform(Message input)
{
var linkOpts = new DataflowLinkOptions { PropagateCompletion = true };

var itr = new TransformManyBlock, int>(x => x);
var multiply = new TransformBlock(x => x * 10);
var convert = new TransformBlock(x => "STR-" + x);

itr.LinkTo(multiply, linkOpts);
multiply.LinkTo(convert, linkOpts);

var pipeline = DataflowBlock.Encapsulate(itr, convert);

pipeline.Post(input.Items);
pipeline.Complete();

var buffer = new BufferBlock();
pipeline.LinkTo(buffer);

await pipeline.Completion;

IList items;
if (!buffer.TryReceiveAll(out items))
{
items = new List(

Solution

Isn't it possible to rewrite the code to a single pipeline without
creating an instance of a nested pipeline for each message?

It really seems a hard task because you are putting the pipeline building logic into the transform Method. Create a method to build your pipeline, it is really asking for it!

private static PipelineType _pipeline;
private static PipelineType BuildPipeline(Message input){
    if(_pipeline != null){
        _pipeline.Post(input.Items);
        _pipeline.Complete();
        return _pipeline;
    }
    var linkOpts = new DataflowLinkOptions { PropagateCompletion = true };

    var itr = new TransformManyBlock, int>(x => x);
    var multiply = new TransformBlock(x => x * 10);
    var convert = new TransformBlock(x => "STR-" + x);

    itr.LinkTo(multiply, linkOpts);
    multiply.LinkTo(convert, linkOpts);

    _pipeline = DataflowBlock.Encapsulate(itr, convert);

    _pipeline.Post(input.Items);
    _pipeline.Complete();

    var buffer = new BufferBlock();
    _pipeline.LinkTo(buffer);

    return _pipeline;
}


To be able to acess the buffer in your transform method you should set it public. But actually the best option would be that await pipeline.Completion would return the result you need.

Code Snippets

private static PipelineType _pipeline;
private static PipelineType BuildPipeline(Message<int> input){
    if(_pipeline != null){
        _pipeline.Post(input.Items);
        _pipeline.Complete();
        return _pipeline;
    }
    var linkOpts = new DataflowLinkOptions { PropagateCompletion = true };

    var itr = new TransformManyBlock<IList<int>, int>(x => x);
    var multiply = new TransformBlock<int, int>(x => x * 10);
    var convert = new TransformBlock<int, string>(x => "STR-" + x);

    itr.LinkTo(multiply, linkOpts);
    multiply.LinkTo(convert, linkOpts);

    _pipeline = DataflowBlock.Encapsulate(itr, convert);

    _pipeline.Post(input.Items);
    _pipeline.Complete();

    var buffer = new BufferBlock<string>();
    _pipeline.LinkTo(buffer);

    return _pipeline;
}

Context

StackExchange Code Review Q#104993, answer score: 2

Revisions (0)

No revisions yet.