patterncsharpModerate
Golang channel in C#
Viewed 0 times
golangchannelstackoverflow
Problem
I have tried to mimic Golang channels in C# and its performance is pretty good compared to golang itself. On my machine, each channel operation of Golang takes ~75 nano-sec and each
Please let me know if this code can be improved in any way.
Chan (in C#) operation takes ~90 nano-sec.Please let me know if this code can be improved in any way.
class Chan
{
readonly int size;
T[] buffer;
long head = -1;
long tail = -1;
long closed = 0;
public Chan() { this.size = 0; }
public Chan(int size)
{
if (size (localClosed = Interlocked.Read(ref closed)) > 0 || tail - head 0) return false;
var newTail = Interlocked.Increment(ref tail);
buffer[newTail % buffer.Length] = t;
return true;
}
}
object tailLock = new object();
public bool From(out T val)
{
lock (tailLock)
{
long localClosed = 0L;
if (tail - head == 0) SpinWait.SpinUntil(() => (localClosed = Interlocked.Read(ref closed)) > 0 || tail - head > 0);
if (localClosed > 0)
{
val = default(T);
return false;
}
var newHead = Interlocked.Increment(ref head);
val = buffer[newHead % buffer.Length];
return true;
}
}
public void Close()
{
Interlocked.Increment(ref closed);
}
}Solution
Chansounds like the abbreviated name forChanneland apparently it's a channel. So I'd useChannelor maybe evenGoChannel.
- The most commonly used naming convention I have seen for private members is to prefix them with an underscore. This way you can see at the first glance whether it's a local variable or a class member. This also means you can get rid of
this.most of the time.
- I really prefer to spell out the access modifier even if
privateis default but YMMV.
- Method names should describe actions or operations (because they operate on data and sometimes modify the state of the object).
ToandFromare not actions or operations. That being said: The channel seems to have fixed size queue semantics (FIFO) so I'd consider calling the operationsEnqueueandDequeuewhich would make it immediately clear how the data is being processed. (The semantic of the underlying data structure should not be exposed. Don't know what I was thinking there.) Rather use theSendandReceivesemantics from the go definition.
- Given the previous point it could be useful to have a
Peekmethod to check what will come next.
_headand_tailare longs and access is not guaranteed to be atomic so you should useInterlocked.Readto obtain them.
-
Also the implementation is actually broken. Assume two threads A and B, A calls
Send() and B calls Receive(), first execution _head == _tail == -1:- A: execute
Interlocked.Increment(_tail)(_tailis now 0)
- B:
_tail - _head > 0is true (0 - -1 == 1), leaves spinlock
- B: execute
Interlocked.Increment(_head)(_headis now 0)
- B: read
_buffer[_head]
- A: write
_buffer[_tail]
- B has read from buffer before element was written.
-
This problem can be easily reproduced with this test case (almost every iteration results in dupes):
[TestCase]
public void TestSPSC()
{
int numItems = 10000;
int numIterations = 100;
for (int i = 0; i (100);
var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); });
var reader = Task.Factory.StartNew>(() => {
var numbers = new List(numItems);
for (int idx = 1; idx x).ToList();
});
Task.WaitAll(writer, reader);
var dupes = reader.Result.GroupBy(x => x).Where(g => g.Count() > 1).ToList();
if (dupes.Count > 0)
{
Console.WriteLine("{0}: {1} DUPES!", i, dupes.Count);
}
}
}I changed the implementation to use .NET's
BlockingCollection wrapped around a ConcurrentQueue:public class Channel
{
private BlockingCollection _buffer;
public Channel() : this(1) { }
public Channel(int size)
{
_buffer = new BlockingCollection(new ConcurrentQueue(), size);
}
public bool Send(T t)
{
try
{
_buffer.Add(t);
}
catch (InvalidOperationException)
{
// will be thrown when the collection gets closed
return false;
}
return true;
}
public bool Receive(out T val)
{
try
{
val = _buffer.Take();
}
catch (InvalidOperationException)
{
// will be thrown when the collection is empty and got closed
val = default(T);
return false;
}
return true;
}
public void Close()
{
_buffer.CompleteAdding();
}
public IEnumerable Range()
{
T val;
while (Receive(out val))
{
yield return val;
}
}
}The code is much easier to read and has probably less bugs than your self implemented one. It's also fast. I can pump 10,000,000 items (I tested with
int) through a channel (buffer size 100) with single producer single consumer in 5sec. That's 0.5ns per item.[TestCase]
public void TestSPSC_Performance()
{
int numItems = 10000000;
int numIterations = 10;
var stopWatch = new Stopwatch();
stopWatch.Start();
for (int i = 0; i (100);
var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); });
var reader = Task.Factory.StartNew>(() => { var res = new List(numItems); foreach (var num in channel.Range()) { res.Add(num); } return res; });
Task.WaitAll(writer, reader);
}
stopWatch.Stop();
var elapsedMs = stopWatch.Elapsed.TotalMilliseconds;
Console.WriteLine("SPSC N = {0}: {1:.00}ms/iteration, {2:.00}ns/item (tx+rx)", numItems, elapsedMs / numIterations, elapsedMs * 1000.0 / numItems / numIterations);
}Code Snippets
[TestCase]
public void TestSPSC()
{
int numItems = 10000;
int numIterations = 100;
for (int i = 0; i < numIterations; ++i)
{
var channel = new Channel<int>(100);
var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); });
var reader = Task.Factory.StartNew<List<int>>(() => {
var numbers = new List<int>(numItems);
for (int idx = 1; idx <= numItems; ++idx)
{
int num;
var res = channel.Receive(out num);
numbers.Add(num);
}
return numbers.OrderBy(x => x).ToList();
});
Task.WaitAll(writer, reader);
var dupes = reader.Result.GroupBy(x => x).Where(g => g.Count() > 1).ToList();
if (dupes.Count > 0)
{
Console.WriteLine("{0}: {1} DUPES!", i, dupes.Count);
}
}
}public class Channel<T>
{
private BlockingCollection<T> _buffer;
public Channel() : this(1) { }
public Channel(int size)
{
_buffer = new BlockingCollection<T>(new ConcurrentQueue<T>(), size);
}
public bool Send(T t)
{
try
{
_buffer.Add(t);
}
catch (InvalidOperationException)
{
// will be thrown when the collection gets closed
return false;
}
return true;
}
public bool Receive(out T val)
{
try
{
val = _buffer.Take();
}
catch (InvalidOperationException)
{
// will be thrown when the collection is empty and got closed
val = default(T);
return false;
}
return true;
}
public void Close()
{
_buffer.CompleteAdding();
}
public IEnumerable<T> Range()
{
T val;
while (Receive(out val))
{
yield return val;
}
}
}[TestCase]
public void TestSPSC_Performance()
{
int numItems = 10000000;
int numIterations = 10;
var stopWatch = new Stopwatch();
stopWatch.Start();
for (int i = 0; i < numIterations; ++i)
{
var channel = new Channel<int>(100);
var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); });
var reader = Task.Factory.StartNew<List<int>>(() => { var res = new List<int>(numItems); foreach (var num in channel.Range()) { res.Add(num); } return res; });
Task.WaitAll(writer, reader);
}
stopWatch.Stop();
var elapsedMs = stopWatch.Elapsed.TotalMilliseconds;
Console.WriteLine("SPSC N = {0}: {1:.00}ms/iteration, {2:.00}ns/item (tx+rx)", numItems, elapsedMs / numIterations, elapsedMs * 1000.0 / numItems / numIterations);
}Context
StackExchange Code Review Q#32500, answer score: 13
Revisions (0)
No revisions yet.