patterncsharpMinor
Rewriting to single pipeline by using data flow blocks
Viewed 0 times
blocksflowsingleusingrewritingdatapipeline
Problem
I have a number of messages with a payload as a list of items. My goal is to transform the message payload without losing the relation between the message and payloads.
Isn't it possible to rewrite the code to a single pipeline without creating an instance of a nested pipeline for each message?
```
using System.Threading.Tasks.Dataflow;
public class Message
{
public int Id { get; set; }
public IList Items { get; set; }
}
class Program
{
static void Main(string[] args)
{
var rnd = new Random();
var input = new TransformBlock, Message>(x => Transform(x));
var action = new ActionBlock>(
x =>
{
Console.WriteLine("MSG #{0}", x.Id);
foreach (var item in x.Items)
{
Console.WriteLine("\t {0}", item);
}
});
input.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true });
for (var idx = 1; idx { Id = idx, Items = Enumerable.Range(0, rnd.Next(0, 10)).ToList() };
input.Post(msg);
}
input.Complete();
action.Completion.Wait();
Console.WriteLine("Done");
Console.ReadKey();
}
private static async Task> Transform(Message input)
{
var linkOpts = new DataflowLinkOptions { PropagateCompletion = true };
var itr = new TransformManyBlock, int>(x => x);
var multiply = new TransformBlock(x => x * 10);
var convert = new TransformBlock(x => "STR-" + x);
itr.LinkTo(multiply, linkOpts);
multiply.LinkTo(convert, linkOpts);
var pipeline = DataflowBlock.Encapsulate(itr, convert);
pipeline.Post(input.Items);
pipeline.Complete();
var buffer = new BufferBlock();
pipeline.LinkTo(buffer);
await pipeline.Completion;
IList items;
if (!buffer.TryReceiveAll(out items))
{
items = new List(
Isn't it possible to rewrite the code to a single pipeline without creating an instance of a nested pipeline for each message?
```
using System.Threading.Tasks.Dataflow;
public class Message
{
public int Id { get; set; }
public IList Items { get; set; }
}
class Program
{
static void Main(string[] args)
{
var rnd = new Random();
var input = new TransformBlock, Message>(x => Transform(x));
var action = new ActionBlock>(
x =>
{
Console.WriteLine("MSG #{0}", x.Id);
foreach (var item in x.Items)
{
Console.WriteLine("\t {0}", item);
}
});
input.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true });
for (var idx = 1; idx { Id = idx, Items = Enumerable.Range(0, rnd.Next(0, 10)).ToList() };
input.Post(msg);
}
input.Complete();
action.Completion.Wait();
Console.WriteLine("Done");
Console.ReadKey();
}
private static async Task> Transform(Message input)
{
var linkOpts = new DataflowLinkOptions { PropagateCompletion = true };
var itr = new TransformManyBlock, int>(x => x);
var multiply = new TransformBlock(x => x * 10);
var convert = new TransformBlock(x => "STR-" + x);
itr.LinkTo(multiply, linkOpts);
multiply.LinkTo(convert, linkOpts);
var pipeline = DataflowBlock.Encapsulate(itr, convert);
pipeline.Post(input.Items);
pipeline.Complete();
var buffer = new BufferBlock();
pipeline.LinkTo(buffer);
await pipeline.Completion;
IList items;
if (!buffer.TryReceiveAll(out items))
{
items = new List(
Solution
Isn't it possible to rewrite the code to a single pipeline without
creating an instance of a nested pipeline for each message?
It really seems a hard task because you are putting the pipeline building logic into the transform Method. Create a method to build your pipeline, it is really asking for it!
To be able to acess the buffer in your transform method you should set it public. But actually the best option would be that
creating an instance of a nested pipeline for each message?
It really seems a hard task because you are putting the pipeline building logic into the transform Method. Create a method to build your pipeline, it is really asking for it!
private static PipelineType _pipeline;
private static PipelineType BuildPipeline(Message input){
if(_pipeline != null){
_pipeline.Post(input.Items);
_pipeline.Complete();
return _pipeline;
}
var linkOpts = new DataflowLinkOptions { PropagateCompletion = true };
var itr = new TransformManyBlock, int>(x => x);
var multiply = new TransformBlock(x => x * 10);
var convert = new TransformBlock(x => "STR-" + x);
itr.LinkTo(multiply, linkOpts);
multiply.LinkTo(convert, linkOpts);
_pipeline = DataflowBlock.Encapsulate(itr, convert);
_pipeline.Post(input.Items);
_pipeline.Complete();
var buffer = new BufferBlock();
_pipeline.LinkTo(buffer);
return _pipeline;
}To be able to acess the buffer in your transform method you should set it public. But actually the best option would be that
await pipeline.Completion would return the result you need.Code Snippets
private static PipelineType _pipeline;
private static PipelineType BuildPipeline(Message<int> input){
if(_pipeline != null){
_pipeline.Post(input.Items);
_pipeline.Complete();
return _pipeline;
}
var linkOpts = new DataflowLinkOptions { PropagateCompletion = true };
var itr = new TransformManyBlock<IList<int>, int>(x => x);
var multiply = new TransformBlock<int, int>(x => x * 10);
var convert = new TransformBlock<int, string>(x => "STR-" + x);
itr.LinkTo(multiply, linkOpts);
multiply.LinkTo(convert, linkOpts);
_pipeline = DataflowBlock.Encapsulate(itr, convert);
_pipeline.Post(input.Items);
_pipeline.Complete();
var buffer = new BufferBlock<string>();
_pipeline.LinkTo(buffer);
return _pipeline;
}Context
StackExchange Code Review Q#104993, answer score: 2
Revisions (0)
No revisions yet.