HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

CancellationToken and Reactive Extensions

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
andcancellationtokenextensionsreactive

Problem

I have a couple of calls that I need to cancel a subscription based on a CancellationToken. I saw How to cancel an observable sequence but I needed a more generic way. I created an extension method that seems to work.

public static IObservable TakeUntil(this IObservable source, CancellationToken token)
{

    var cancelationObservable =
        Observable.Create(observer => token.Register(() => observer.OnNext(null)));
    return source.TakeUntil(cancelationObservable);
}


Is there a built in way of Rx to cover this? Is Observable.Create the best way to trigger the pulse?

This seems so basic that I feel I may be missing something.

Solution

I need to cancel a subscription

If you're actually subscribing to the source using Subscribe(), then that has overloads that accept a CancellationToken, you might want to consider using that.

Observable.Create(observer => token.Register(() => observer.OnNext(null)));


It took me a while to figure out that this doesn't cause a memory leak. I thought that since you call Register(), but never use the returned IDisposable to unregister, the registrations will stay there even when they are no longer needed. But that's not true, because the IDisposable is actually returned to Create(), which will Dispose() it when it's no longer needed.

I think this is confusing enough that it warrants a comment explaining the situation.

Also, the common way to express the void type in generics in Rx is the Unit type, not object:

Observable.Create(observer => token.Register(() => observer.OnNext(Unit.Default)));


This is longer than your code, but I think it's also clearer.

Code Snippets

Observable.Create<object>(observer => token.Register(() => observer.OnNext(null)));
Observable.Create<Unit>(observer => token.Register(() => observer.OnNext(Unit.Default)));

Context

StackExchange Code Review Q#67597, answer score: 6

Revisions (0)

No revisions yet.