patterncsharpMinor
ObservablePriorityQueue<T> Implementation
Viewed 0 times
implementationobservablepriorityqueuestackoverflow
Problem
I have a requirement in my current project that will need a Prioritised Queue that supports the IObservable interface. Please notify me of any problems with the implementation that I currently have:
ObservablePriorityQueue
```
public sealed class ObservablePriorityQueue : IQueue, IObservable where T : IPrioritised
{
#region "IObservable Implementation"
// A list of the subscribers for the IObservable implementation
List> _subscribers = new List>(10);
Object _observableSyncLock = new Object();
#region "Interface specific"
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException("The observer cannot be null.");
}
if (!_subscribers.Contains(observer))
{
lock (_observableSyncLock)
{
_subscribers.Add(observer);
}
}
return new Disposable(() =>
{
this.Unsubscribe(observer);
});
}
#endregion
public void Unsubscribe(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException("The observer cannot be null.");
}
observer.OnCompleted();
if (_subscribers.Contains(observer))
{
// remove the entry, but don't dispose it just
// in case they want to re-subscribe with the same observer later
lock (_observableSyncLock)
{
_subscribers.Remove(observer);
}
}
}
#endregion
#region "IQueue Implementation"
readonly List _data = new List(100);
Object _queueSyncLock = new Object();
public void Enqueue(T value)
{
if (value == null)
{
throw new ArgumentException("The item to be enqueued cannot be null");
}
lock (_queueSyncLock)
{
_data.Add(value);
}
// now that the entry has been
ObservablePriorityQueue
```
public sealed class ObservablePriorityQueue : IQueue, IObservable where T : IPrioritised
{
#region "IObservable Implementation"
// A list of the subscribers for the IObservable implementation
List> _subscribers = new List>(10);
Object _observableSyncLock = new Object();
#region "Interface specific"
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException("The observer cannot be null.");
}
if (!_subscribers.Contains(observer))
{
lock (_observableSyncLock)
{
_subscribers.Add(observer);
}
}
return new Disposable(() =>
{
this.Unsubscribe(observer);
});
}
#endregion
public void Unsubscribe(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException("The observer cannot be null.");
}
observer.OnCompleted();
if (_subscribers.Contains(observer))
{
// remove the entry, but don't dispose it just
// in case they want to re-subscribe with the same observer later
lock (_observableSyncLock)
{
_subscribers.Remove(observer);
}
}
}
#endregion
#region "IQueue Implementation"
readonly List _data = new List(100);
Object _queueSyncLock = new Object();
public void Enqueue(T value)
{
if (value == null)
{
throw new ArgumentException("The item to be enqueued cannot be null");
}
lock (_queueSyncLock)
{
_data.Add(value);
}
// now that the entry has been
Solution
- At first, I was confused what exactly did the implementation of
IObservablemean. You should properly document that.
- You don't need to have a separate lock objects. If you're locking on a specific object, I think you should use that object in the
lock.
- If you're just calling a single method in your lambda, you can use more succinct syntax:
new Disposable(() => this.Unsubscribe(observer)).
- If you want to make your code thread-safe, you need to lock all your reads too. In
Unsubscribe()you don't do that, which means someone could write to the list while you're reading it. The same problem is inPeek().
Dequeue()shouldn't returndefault(T)if the queue is empty. This can be problematic especially with value types. Instead, you should have a method likebool TryDequeue(out T result).
- You're accessing
Countoutside of a lock. I wouldn't rely on the fact that doing this is safe, I think you shouldlockbefore accessing it too.
- Since you're using Rx, you might as well use their
Disposable.Create()instead of creating your own.
Context
StackExchange Code Review Q#21574, answer score: 2
Revisions (0)
No revisions yet.