patterncsharpMinor
BufferAggregate
Viewed 0 times
bufferaggregatestackoverflowprogramming
Problem
In the ongoing saga of me rewriting functions that can be easily composed from existing Rx functions, I present my latest progeny: a bastard combination of Scan and Aggregate. The aim is to aggregate n items from a source, and emit the result every n. Then start again.
Note that there is a good reason for not using pure Buffer, which is that the data items are rather large, so it is preferable to calculate the aggregate as we go along, rather than doing it all at the end.
Am I complicating things too much?
public static IObservable BufferAggregate(this IObservable source,
Func aggregator, int size)
{
return Observable.Create(observer =>
{
var count = 0;
var current = default(T);
return source.Subscribe(t =>
{
count++;
if (count == 1)
current = t;
else if (count < size)
current = aggregator(current, t);
else // (count == size)
{
observer.OnNext(aggregator(current, t));
count = 0;
}
});
});
}Note that there is a good reason for not using pure Buffer, which is that the data items are rather large, so it is preferable to calculate the aggregate as we go along, rather than doing it all at the end.
Am I complicating things too much?
Solution
-
Because the objects which are represented by
-
the method argument
-
you should use
-
the else part
Implementing the mentioned points will lead to
Because the objects which are represented by
T are the providers of notifications I would like to suggest to rename current -> currentProvider and t -> provider which makes it more clear what they are about, neither current nor t are good names. -
the method argument
size should be renamed as well. Because you said in the question The aim is to aggregate n items from a source, and emit the result every n. the argument should be name blockSize. -
you should use
braces although they might be optional because braceophilia can lead to serious hard to track bugs. -
the else part
else // (count == size) looks like you had an else if but decided to skip it in favour of the else. Although you save one comparision in the way you have it now, I would like to suggest to revert this to an else if wich makes the intent more clear. Implementing the mentioned points will lead to
public static IObservable BufferAggregate(this IObservable source,
Func aggregator, int blockSize)
{
return Observable.Create(observer =>
{
var count = 0;
var currentProvider = default(T);
return source.Subscribe(provider =>
{
count++;
if (count == 1)
{
currentProvider = provider;
}
else if (count < blockSize)
{
currentProvider = aggregator(currentProvider, provider);
}
else if (count == blockSize)
{
observer.OnNext(aggregator(currentProvider, provider));
count = 0;
}
});
});
}Code Snippets
public static IObservable<T> BufferAggregate<T>(this IObservable<T> source,
Func<T, T, T> aggregator, int blockSize)
{
return Observable.Create<T>(observer =>
{
var count = 0;
var currentProvider = default(T);
return source.Subscribe(provider =>
{
count++;
if (count == 1)
{
currentProvider = provider;
}
else if (count < blockSize)
{
currentProvider = aggregator(currentProvider, provider);
}
else if (count == blockSize)
{
observer.OnNext(aggregator(currentProvider, provider));
count = 0;
}
});
});
}Context
StackExchange Code Review Q#113255, answer score: 4
Revisions (0)
No revisions yet.