patterncsharpMinor
'Buffered' function for Rx
Viewed 0 times
functionforbuffered
Problem
My application has a whole chain of Observables running through it, and recently I found that something was slowing down my (hot) source, but I couldn't work out where.
Obviously, with everything running in series by default, a good first step was to introduce some 'parallelism', so after having done battle with my own implementation, I discovered that
But that didn't solve my problem of knowing where the slow bits were. Which lead me to trying to invent a
I'm aware that this doesn't do all the fancy Rx business of giving you the choice of which scheduler and all that, but for my purposes currently Task is good enough (unless you tell me otherwise):
This is the tuple used to return data in:
Here is the function:
```
public static IObservable> Buffered(this IObservable source)
{
return Observable.Create>(observer =>
{
var buffer = new BlockingCollection();
Exception exception = null;
bool completed = false;
var task = new Task(() =>
{
foreach(var t in buffer.GetConsumingEnumerable())
observer.OnNext(new WithBufferSize(t, buffer.Count));
if(exception != null)
observer.OnError(exception);
else if(completed)
observer.OnCompleted();
}, TaskCreationOptions.LongRunning);
task.Start();
var subscription = source.Subscribe(t => buffer.Add(t),
ex =>
{
exception = ex;
buffer.CompleteAdding();
},
() =>
{
completed = true;
buffer.Comp
Obviously, with everything running in series by default, a good first step was to introduce some 'parallelism', so after having done battle with my own implementation, I discovered that
ObserveOn did what I wanted: allow a source to 'carry on' while a subscriber did whatever.But that didn't solve my problem of knowing where the slow bits were. Which lead me to trying to invent a
Buffered function which does the buffering, but also tells me how long the buffer is.I'm aware that this doesn't do all the fancy Rx business of giving you the choice of which scheduler and all that, but for my purposes currently Task is good enough (unless you tell me otherwise):
This is the tuple used to return data in:
public class WithBufferSize
{
public readonly T Value;
public readonly int BufferSize;
public WithBufferSize(T value, int bufferSize)
{
Value = value;
BufferSize = bufferSize;
}
}Here is the function:
```
public static IObservable> Buffered(this IObservable source)
{
return Observable.Create>(observer =>
{
var buffer = new BlockingCollection();
Exception exception = null;
bool completed = false;
var task = new Task(() =>
{
foreach(var t in buffer.GetConsumingEnumerable())
observer.OnNext(new WithBufferSize(t, buffer.Count));
if(exception != null)
observer.OnError(exception);
else if(completed)
observer.OnCompleted();
}, TaskCreationOptions.LongRunning);
task.Start();
var subscription = source.Subscribe(t => buffer.Add(t),
ex =>
{
exception = ex;
buffer.CompleteAdding();
},
() =>
{
completed = true;
buffer.Comp
Solution
Don't expose fields, even if they're
Instead expose properties:
readonly:public readonly T Value;
public readonly int BufferSize;Instead expose properties:
public T Value { get; private set; }
public int BufferSize { get; private set; }Code Snippets
public readonly T Value;
public readonly int BufferSize;public T Value { get; private set; }
public int BufferSize { get; private set; }Context
StackExchange Code Review Q#110892, answer score: 2
Revisions (0)
No revisions yet.