HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

ObservablePriorityQueue<T> Implementation

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
implementationobservablepriorityqueuestackoverflow

Problem

I have a requirement in my current project that will need a Prioritised Queue that supports the IObservable interface. Please notify me of any problems with the implementation that I currently have:

ObservablePriorityQueue

```
public sealed class ObservablePriorityQueue : IQueue, IObservable where T : IPrioritised
{
#region "IObservable Implementation"
// A list of the subscribers for the IObservable implementation
List> _subscribers = new List>(10);

Object _observableSyncLock = new Object();

#region "Interface specific"
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException("The observer cannot be null.");
}

if (!_subscribers.Contains(observer))
{
lock (_observableSyncLock)
{
_subscribers.Add(observer);
}
}

return new Disposable(() =>
{
this.Unsubscribe(observer);
});
}
#endregion

public void Unsubscribe(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException("The observer cannot be null.");
}

observer.OnCompleted();

if (_subscribers.Contains(observer))
{
// remove the entry, but don't dispose it just
// in case they want to re-subscribe with the same observer later
lock (_observableSyncLock)
{
_subscribers.Remove(observer);
}
}
}
#endregion

#region "IQueue Implementation"

readonly List _data = new List(100);
Object _queueSyncLock = new Object();

public void Enqueue(T value)
{
if (value == null)
{
throw new ArgumentException("The item to be enqueued cannot be null");
}

lock (_queueSyncLock)
{
_data.Add(value);
}

// now that the entry has been

Solution


  • At first, I was confused what exactly did the implementation of IObservable mean. You should properly document that.



  • You don't need to have a separate lock objects. If you're locking on a specific object, I think you should use that object in the lock.



  • If you're just calling a single method in your lambda, you can use more succinct syntax: new Disposable(() => this.Unsubscribe(observer)).



  • If you want to make your code thread-safe, you need to lock all your reads too. In Unsubscribe() you don't do that, which means someone could write to the list while you're reading it. The same problem is in Peek().



  • Dequeue() shouldn't return default(T) if the queue is empty. This can be problematic especially with value types. Instead, you should have a method like bool TryDequeue(out T result).



  • You're accessing Count outside of a lock. I wouldn't rely on the fact that doing this is safe, I think you should lock before accessing it too.



  • Since you're using Rx, you might as well use their Disposable.Create() instead of creating your own.

Context

StackExchange Code Review Q#21574, answer score: 2

Revisions (0)

No revisions yet.