patterncsharpMinor
CancellationToken and Reactive Extensions
Viewed 0 times
andcancellationtokenextensionsreactive
Problem
I have a couple of calls that I need to cancel a subscription based on a CancellationToken. I saw How to cancel an observable sequence but I needed a more generic way. I created an extension method that seems to work.
Is there a built in way of Rx to cover this? Is Observable.Create the best way to trigger the pulse?
This seems so basic that I feel I may be missing something.
public static IObservable TakeUntil(this IObservable source, CancellationToken token)
{
var cancelationObservable =
Observable.Create(observer => token.Register(() => observer.OnNext(null)));
return source.TakeUntil(cancelationObservable);
}Is there a built in way of Rx to cover this? Is Observable.Create the best way to trigger the pulse?
This seems so basic that I feel I may be missing something.
Solution
I need to cancel a subscription
If you're actually subscribing to the source using
It took me a while to figure out that this doesn't cause a memory leak. I thought that since you call
I think this is confusing enough that it warrants a comment explaining the situation.
Also, the common way to express the
This is longer than your code, but I think it's also clearer.
If you're actually subscribing to the source using
Subscribe(), then that has overloads that accept a CancellationToken, you might want to consider using that.Observable.Create(observer => token.Register(() => observer.OnNext(null)));It took me a while to figure out that this doesn't cause a memory leak. I thought that since you call
Register(), but never use the returned IDisposable to unregister, the registrations will stay there even when they are no longer needed. But that's not true, because the IDisposable is actually returned to Create(), which will Dispose() it when it's no longer needed.I think this is confusing enough that it warrants a comment explaining the situation.
Also, the common way to express the
void type in generics in Rx is the Unit type, not object:Observable.Create(observer => token.Register(() => observer.OnNext(Unit.Default)));This is longer than your code, but I think it's also clearer.
Code Snippets
Observable.Create<object>(observer => token.Register(() => observer.OnNext(null)));Observable.Create<Unit>(observer => token.Register(() => observer.OnNext(Unit.Default)));Context
StackExchange Code Review Q#67597, answer score: 6
Revisions (0)
No revisions yet.