HiveBrain v1.2.0
Get Started
← Back to all entries
snippetcsharpMinor

Create an IObservable from a method

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
fromiobservablecreatemethod

Problem

I have such a method that performs a long query search against some data.

Task> Search(string query){ ... }


I have tried various way to create an IObservable form of it but failed. At the end after some google help I wrote such a method.

private IObservable Search(string query)
{
    return Observable.Create((IObserver observer)=>
        {
            List result = new List();
            foreach (TestsGroupMeta group in Engine.Groups)
            {
                string name = group.ToString();
                if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
                {
                    result.Add(new SearchResult{ Name = name, Type = "Group"});
                }

                foreach (TestMethodMeta method in group.Methods)
                {
                    name = method.ToString();
                    if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
                    {
                        result.Add(new SearchResult {Name = name, Type = "Method"});
                    }
                }
            }

            observer.OnNext(result.ToArray());
            observer.OnCompleted();

            return () => {};
        });
}


Is such a method correctly written? If not, what would be correct?

Solution

using the TaskObservableExtensions from System.Reactive.Threading.Tasks (in the Rx Linq DLL) it is easy:

IObservable> observable = Search(queryString).ToObservable()


I'd consider the code there to be "correctly written" (I'm in no way linked to the code nor its authors.)

If you want to rewrite your Search method to work directly with Observables I'd recommend using their full power:

private IObservable Search(string query)
    {
        return Observable.Create(
            (observer, cancellationToken) => Task.Factory.StartNew(
                () =>
                {
                    foreach (TestsGroupMeta group in Engine.Groups)
                    {
                        if (cancellationToken.IsCancellationRequested)
                        {
                            break;
                        }

                        string name = group.ToString();
                        if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
                        {
                            observer.OnNext(new SearchResult { Name = name, Type = "Group" });
                        }

                        foreach (TestMethodMeta method in group.Methods)
                        {
                            name = method.ToString();
                            if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
                            {
                                observer.OnNext(new SearchResult { Name = name, Type = "Method" });
                            }
                        }
                    }

                    observer.OnCompleted();
                },
                cancellationToken,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default));
    }


An observer is now able to process SearchResults as soon as they are found (as opposed to wait for the completion of the computation yielding all results at once) and can unsubscribe anytime which in turn will cause a cancellation of the result computation. If some function still needs all SearchResultsat once there's always Observable.ToEnumerable and friends. In addition, possible parallelization is now obvious (e.g. using Parallel.ForEach) but still not trivial (hinted at by the inclusion of TaskCreationOptions and TaskScheduler, sensible options for Parallel.ForEach not included). Also clients must then be prepared to consume 'out-of-order' search results.

Potential further improvements:

  • check for cancellation at more / more appropriate places



  • split Task/Observable logic and searching



  • handle (now asynchronous) errors (e.g. by trying the foreach and calling observer.OnError)



  • specify culture forToString



  • employ a more 'functional' style by leveraging SelectMany (suits my personal taste)



Otherwise you could at least use Observable.Start which gets rid of the otherwise unused observer:

private IObservable Search(string query)
{
    return Observable.Start(() =>
    {
        List result = new List();
        foreach (TestsGroupMeta group in Engine.Groups)
        {
            string name = group.ToString();
            if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
            {
                result.Add(new SearchResult { Name = name, Type = "Group" });
            }

            foreach (TestMethodMeta method in group.Methods)
            {
                name = method.ToString();
                if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
                {
                    result.Add(new SearchResult { Name = name, Type = "Method" });
                }
            }
        }
        return result;
    });
}

Code Snippets

IObservable<List<SearchResult>> observable = Search(queryString).ToObservable()
private IObservable<SearchResult> Search(string query)
    {
        return Observable.Create<SearchResult>(
            (observer, cancellationToken) => Task.Factory.StartNew(
                () =>
                {
                    foreach (TestsGroupMeta group in Engine.Groups)
                    {
                        if (cancellationToken.IsCancellationRequested)
                        {
                            break;
                        }

                        string name = group.ToString();
                        if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
                        {
                            observer.OnNext(new SearchResult { Name = name, Type = "Group" });
                        }

                        foreach (TestMethodMeta method in group.Methods)
                        {
                            name = method.ToString();
                            if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
                            {
                                observer.OnNext(new SearchResult { Name = name, Type = "Method" });
                            }
                        }
                    }

                    observer.OnCompleted();
                },
                cancellationToken,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default));
    }
private IObservable<SearchResult[]> Search(string query)
{
    return Observable.Start(() =>
    {
        List<SearchResult> result = new List<SearchResult>();
        foreach (TestsGroupMeta group in Engine.Groups)
        {
            string name = group.ToString();
            if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
            {
                result.Add(new SearchResult { Name = name, Type = "Group" });
            }

            foreach (TestMethodMeta method in group.Methods)
            {
                name = method.ToString();
                if (name.IndexOf(query, StringComparison.InvariantCultureIgnoreCase) != -1)
                {
                    result.Add(new SearchResult { Name = name, Type = "Method" });
                }
            }
        }
        return result;
    });
}

Context

StackExchange Code Review Q#29663, answer score: 4

Revisions (0)

No revisions yet.