snippetcsharpMinor
Create an IObservable from a method
Viewed 0 times
fromiobservablecreatemethod
Problem
I have such a method that performs a long query search against some data.
I have tried various way to create an IObservable form of it but failed. At the end after some google help I wrote such a method.
Is such a method correctly written? If not, what would be correct?
Task> Search(string query){ ... }I have tried various way to create an IObservable form of it but failed. At the end after some google help I wrote such a method.
private IObservable Search(string query)
{
return Observable.Create((IObserver observer)=>
{
List result = new List();
foreach (TestsGroupMeta group in Engine.Groups)
{
string name = group.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
result.Add(new SearchResult{ Name = name, Type = "Group"});
}
foreach (TestMethodMeta method in group.Methods)
{
name = method.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
result.Add(new SearchResult {Name = name, Type = "Method"});
}
}
}
observer.OnNext(result.ToArray());
observer.OnCompleted();
return () => {};
});
}Is such a method correctly written? If not, what would be correct?
Solution
using the
I'd consider the code there to be "correctly written" (I'm in no way linked to the code nor its authors.)
If you want to rewrite your
An observer is now able to process
Potential further improvements:
Otherwise you could at least use
TaskObservableExtensions from System.Reactive.Threading.Tasks (in the Rx Linq DLL) it is easy:IObservable> observable = Search(queryString).ToObservable()I'd consider the code there to be "correctly written" (I'm in no way linked to the code nor its authors.)
If you want to rewrite your
Search method to work directly with Observables I'd recommend using their full power:private IObservable Search(string query)
{
return Observable.Create(
(observer, cancellationToken) => Task.Factory.StartNew(
() =>
{
foreach (TestsGroupMeta group in Engine.Groups)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
string name = group.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
observer.OnNext(new SearchResult { Name = name, Type = "Group" });
}
foreach (TestMethodMeta method in group.Methods)
{
name = method.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
observer.OnNext(new SearchResult { Name = name, Type = "Method" });
}
}
}
observer.OnCompleted();
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default));
}An observer is now able to process
SearchResults as soon as they are found (as opposed to wait for the completion of the computation yielding all results at once) and can unsubscribe anytime which in turn will cause a cancellation of the result computation. If some function still needs all SearchResultsat once there's always Observable.ToEnumerable and friends. In addition, possible parallelization is now obvious (e.g. using Parallel.ForEach) but still not trivial (hinted at by the inclusion of TaskCreationOptions and TaskScheduler, sensible options for Parallel.ForEach not included). Also clients must then be prepared to consume 'out-of-order' search results.Potential further improvements:
- check for cancellation at more / more appropriate places
- split Task/Observable logic and searching
- handle (now asynchronous) errors (e.g. by
trying theforeachand callingobserver.OnError)
- specify culture for
ToString
- employ a more 'functional' style by leveraging
SelectMany(suits my personal taste)
Otherwise you could at least use
Observable.Start which gets rid of the otherwise unused observer:private IObservable Search(string query)
{
return Observable.Start(() =>
{
List result = new List();
foreach (TestsGroupMeta group in Engine.Groups)
{
string name = group.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
result.Add(new SearchResult { Name = name, Type = "Group" });
}
foreach (TestMethodMeta method in group.Methods)
{
name = method.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
result.Add(new SearchResult { Name = name, Type = "Method" });
}
}
}
return result;
});
}Code Snippets
IObservable<List<SearchResult>> observable = Search(queryString).ToObservable()private IObservable<SearchResult> Search(string query)
{
return Observable.Create<SearchResult>(
(observer, cancellationToken) => Task.Factory.StartNew(
() =>
{
foreach (TestsGroupMeta group in Engine.Groups)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
string name = group.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
observer.OnNext(new SearchResult { Name = name, Type = "Group" });
}
foreach (TestMethodMeta method in group.Methods)
{
name = method.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
observer.OnNext(new SearchResult { Name = name, Type = "Method" });
}
}
}
observer.OnCompleted();
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default));
}private IObservable<SearchResult[]> Search(string query)
{
return Observable.Start(() =>
{
List<SearchResult> result = new List<SearchResult>();
foreach (TestsGroupMeta group in Engine.Groups)
{
string name = group.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
result.Add(new SearchResult { Name = name, Type = "Group" });
}
foreach (TestMethodMeta method in group.Methods)
{
name = method.ToString();
if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
{
result.Add(new SearchResult { Name = name, Type = "Method" });
}
}
}
return result;
});
}Context
StackExchange Code Review Q#29663, answer score: 4
Revisions (0)
No revisions yet.