HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpModerate

Golang channel in C#

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
golangchannelstackoverflow

Problem

I have tried to mimic Golang channels in C# and its performance is pretty good compared to golang itself. On my machine, each channel operation of Golang takes ~75 nano-sec and each Chan (in C#) operation takes ~90 nano-sec.

Please let me know if this code can be improved in any way.

class Chan
{
    readonly int size;

    T[] buffer;
    long head = -1;
    long tail = -1;
    long closed = 0;

    public Chan() { this.size = 0; }
    public Chan(int size)
    {
        if (size  (localClosed = Interlocked.Read(ref closed)) > 0 || tail - head  0) return false;

            var newTail = Interlocked.Increment(ref tail);
            buffer[newTail % buffer.Length] = t;

            return true;
        }
    }

    object tailLock = new object();
    public bool From(out T val)
    {
        lock (tailLock)
        {
            long localClosed = 0L;

            if (tail - head == 0) SpinWait.SpinUntil(() => (localClosed = Interlocked.Read(ref closed)) > 0 || tail - head > 0);
            if (localClosed > 0)
            {
                val = default(T);
                return false;
            }

            var newHead = Interlocked.Increment(ref head);
            val = buffer[newHead % buffer.Length];

            return true;
        }
    }

    public void Close()
    {
        Interlocked.Increment(ref closed);
    }
}

Solution


  • Chan sounds like the abbreviated name for Channel and apparently it's a channel. So I'd use Channel or maybe even GoChannel.



  • The most commonly used naming convention I have seen for private members is to prefix them with an underscore. This way you can see at the first glance whether it's a local variable or a class member. This also means you can get rid of this. most of the time.



  • I really prefer to spell out the access modifier even if private is default but YMMV.



  • Method names should describe actions or operations (because they operate on data and sometimes modify the state of the object). To and From are not actions or operations. That being said: The channel seems to have fixed size queue semantics (FIFO) so I'd consider calling the operations Enqueue and Dequeue which would make it immediately clear how the data is being processed. (The semantic of the underlying data structure should not be exposed. Don't know what I was thinking there.) Rather use the Send and Receive semantics from the go definition.



  • Given the previous point it could be useful to have a Peek method to check what will come next.



  • _head and _tail are longs and access is not guaranteed to be atomic so you should use Interlocked.Read to obtain them.



-
Also the implementation is actually broken. Assume two threads A and B, A calls Send() and B calls Receive(), first execution _head == _tail == -1:

  • A: execute Interlocked.Increment(_tail) (_tail is now 0)



  • B: _tail - _head > 0 is true (0 - -1 == 1), leaves spinlock



  • B: execute Interlocked.Increment(_head) (_head is now 0)



  • B: read _buffer[_head]



  • A: write _buffer[_tail]



  • B has read from buffer before element was written.



-
This problem can be easily reproduced with this test case (almost every iteration results in dupes):

[TestCase]
public void TestSPSC()
{
    int numItems = 10000;
    int numIterations = 100;

    for (int i = 0; i (100);
        var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); });
        var reader = Task.Factory.StartNew>(() => { 
            var numbers = new List(numItems);
            for (int idx = 1; idx  x).ToList();
        });
        Task.WaitAll(writer, reader);
        var dupes = reader.Result.GroupBy(x => x).Where(g => g.Count() > 1).ToList();
        if (dupes.Count > 0)
        {
            Console.WriteLine("{0}: {1} DUPES!", i, dupes.Count);
        }
    }
}


I changed the implementation to use .NET's BlockingCollection wrapped around a ConcurrentQueue:

public class Channel
{
    private BlockingCollection _buffer;

    public Channel() : this(1) { }
    public Channel(int size)
    {
        _buffer = new BlockingCollection(new ConcurrentQueue(), size);
    }

    public bool Send(T t)
    {
        try
        {
            _buffer.Add(t);
        }
        catch (InvalidOperationException)
        {
            // will be thrown when the collection gets closed
            return false;
        }
        return true;
    }

    public bool Receive(out T val)
    {
        try
        {
            val = _buffer.Take();
        }
        catch (InvalidOperationException)
        {
            // will be thrown when the collection is empty and got closed
            val = default(T);
            return false;
        }
        return true;
    }

    public void Close()
    {
        _buffer.CompleteAdding();
    }

    public IEnumerable Range()
    {
        T val;
        while (Receive(out val))
        {
            yield return val;
        }
    }
}


The code is much easier to read and has probably less bugs than your self implemented one. It's also fast. I can pump 10,000,000 items (I tested with int) through a channel (buffer size 100) with single producer single consumer in 5sec. That's 0.5ns per item.

[TestCase]
    public void TestSPSC_Performance()
    {
        int numItems = 10000000;
        int numIterations = 10;

        var stopWatch = new Stopwatch();
        stopWatch.Start();
        for (int i = 0; i (100);
            var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); });
            var reader = Task.Factory.StartNew>(() => { var res = new List(numItems); foreach (var num in channel.Range()) { res.Add(num); } return res; });
            Task.WaitAll(writer, reader);
        }
        stopWatch.Stop();

        var elapsedMs = stopWatch.Elapsed.TotalMilliseconds;
        Console.WriteLine("SPSC N = {0}: {1:.00}ms/iteration, {2:.00}ns/item (tx+rx)", numItems,  elapsedMs / numIterations, elapsedMs * 1000.0 / numItems / numIterations);
    }

Code Snippets

[TestCase]
public void TestSPSC()
{
    int numItems = 10000;
    int numIterations = 100;

    for (int i = 0; i < numIterations; ++i)
    {
        var channel = new Channel<int>(100);
        var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); });
        var reader = Task.Factory.StartNew<List<int>>(() => { 
            var numbers = new List<int>(numItems);
            for (int idx = 1; idx <= numItems; ++idx)
            {
                int num;
                var res = channel.Receive(out num);
                numbers.Add(num);
            }
            return numbers.OrderBy(x => x).ToList();
        });
        Task.WaitAll(writer, reader);
        var dupes = reader.Result.GroupBy(x => x).Where(g => g.Count() > 1).ToList();
        if (dupes.Count > 0)
        {
            Console.WriteLine("{0}: {1} DUPES!", i, dupes.Count);
        }
    }
}
public class Channel<T>
{
    private BlockingCollection<T> _buffer;

    public Channel() : this(1) { }
    public Channel(int size)
    {
        _buffer = new BlockingCollection<T>(new ConcurrentQueue<T>(), size);
    }

    public bool Send(T t)
    {
        try
        {
            _buffer.Add(t);
        }
        catch (InvalidOperationException)
        {
            // will be thrown when the collection gets closed
            return false;
        }
        return true;
    }

    public bool Receive(out T val)
    {
        try
        {
            val = _buffer.Take();
        }
        catch (InvalidOperationException)
        {
            // will be thrown when the collection is empty and got closed
            val = default(T);
            return false;
        }
        return true;
    }

    public void Close()
    {
        _buffer.CompleteAdding();
    }

    public IEnumerable<T> Range()
    {
        T val;
        while (Receive(out val))
        {
            yield return val;
        }
    }
}
[TestCase]
    public void TestSPSC_Performance()
    {
        int numItems = 10000000;
        int numIterations = 10;

        var stopWatch = new Stopwatch();
        stopWatch.Start();
        for (int i = 0; i < numIterations; ++i)
        {
            var channel = new Channel<int>(100);
            var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); });
            var reader = Task.Factory.StartNew<List<int>>(() => { var res = new List<int>(numItems); foreach (var num in channel.Range()) { res.Add(num); } return res; });
            Task.WaitAll(writer, reader);
        }
        stopWatch.Stop();

        var elapsedMs = stopWatch.Elapsed.TotalMilliseconds;
        Console.WriteLine("SPSC N = {0}: {1:.00}ms/iteration, {2:.00}ns/item (tx+rx)", numItems,  elapsedMs / numIterations, elapsedMs * 1000.0 / numItems / numIterations);
    }

Context

StackExchange Code Review Q#32500, answer score: 13

Revisions (0)

No revisions yet.