HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Implementation of costreams

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
implementationcostreamsstackoverflow

Problem

I am not very good with thread-safety and often fall prey to subtle issues in concurrency. Therefore, I hope that someone here might be able to tell me whether there is a subtle concurrency issue (race condition etc.) in the following code, or whether it’s fine. In particular, have I used Monitor.Wait and Monitor.PulseAll correctly?

Of course, if you can reason about the code and come to the conclusion that it is already correct, that would be a welcome answer too.

This code is intended to implement the costreams pattern (I invented that name, so you won’t find it in Google). It runs two methods (passed in as delegates) in parallel. It provides one of those methods with a write-only stream and the other with a read-only stream. The idea is for one of them to generate data and write it to the stream, and the other one to read from the stream and consume the data. (The reading/writing methods intended to be passed in could be anything that reads to/writes from a stream; they are not likely to be specifically written to be used with costreams. If they were, they could probably be rewritten so that they wouldn’t need to use streams at all.)

While reviewing the Read method, remember that the contract for Stream.Read is slightly counterintuitive: it is allowed to read and return fewer bytes than requested (as long as it returns the number of bytes actually read). Thus the fact that it sometimes returns fewer bytes than the count parameter requests is not a bug. Of course it must not return 0 except when the end of the stream is reached.

```
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace MyLibrary
{
public static class Costreams
{
/// Runs the two specified processes in parallel, allowing one to generate data by writing it to a stream, and the other to consume the data by reading it from a stream.
/// An action that generates data and writes it to a stream.
/// An action th

Solution

Depends what you're trying to do. Is it really possible to have multiple Read threads, as the comment suggests? If so then you're going to run a risk of having several Reads released at once and return in the wrong order. If not then why use PulseAll as opposed to Pulse?

It feels wrong to me that if Read is called and there are items on the Queue then further calls to Write are locked out, but if Read Waits until Write releases it, then Write is called again, Write will be able to lock the Queue and use it. That said, it shouldn't be a problem given that Read will be dealing with another item at the time.

But that raises the question, why bother with the Wait? Why not replace the whole thing with a ManualResetEvent? That way, you only have to lock the queue while you're updating it, so that you can Set the event when you add data, Reset it when you remove the last item.

I haven't tested it, but it will look a lot like this:

```
public static class Costreams
{
/// Runs the two specified processes in parallel, allowing one to generate data by writing it to a stream, and the other to consume the data by reading it from a stream.
/// An action that generates data and writes it to a stream.
/// An action that will want to read information from a stream.
public static void RunCostreams(Action writingAction, Action readingAction)
{
// Everything the writingAction writes will be enqueued in here and dequeued by the readingAction
var queue = new Queue();
using (var hasData = new ManualResetEvent(false))
{
writingCostream writer = new writingCostream(queue, hasData);
readingCostream reader = new readingCostream(queue, hasData);

// Start reading in a new thread. The first call to reader.Read() will block until there is something in the queue to read.
var thread = new Thread(() => readingAction(reader));
thread.Start();

// Start writing. Calls to writer.Write() will place the data in the queue and signal the reading thread.
writingAction(writer);

// Insert a null at the end of the queue to signal to the reader that this is where the data ends.
lock(queue)
{
queue.Enqueue(null);
hasData.Set();
}

// Wait for the reader to consume all the remaining data.
thread.Join();
}
}

private sealed class byteChunk
{
public byte[] Buffer;
public int Offset;
public int Count;
}

private sealed class readingCostream : Stream
{
private Queue _queue;
private ManualResetEvent _hasData;

public readingCostream(Queue queue, ManualResetEvent hasData)
{
_queue = queue;
_hasData = hasData;
}

public override bool CanRead { get { return true; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return false; } }
public override void Flush() { }
public override long Length { get { throw new NotSupportedException(); } }
public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
public override void SetLength(long value) { throw new NotSupportedException(); }
public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }

public override int Read(byte[] buffer, int offset, int count)
{
// If there is no data waiting to be read, wait for it.
_hasData.WaitOne();

byteChunk peeked;
lock(_queue)
peeked = _queue.Peek();

// A null element in the queue signals the end of the stream. Don't dequeue this item.
if (peeked == null)
return 0;

if (peeked.Count _queue;
private ManualResetEvent _hasData;
public writingCostream(Queue queue, ManualResetEvent _hasData)
{
_queue = queue;
_hasData = hasData;
}

public override bool CanRead { get { return false; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return true; } }
public override void Flush() { }
public override long Length { get { throw new NotSupportedException(); } }
public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }
public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
public override void SetLength(long va

Code Snippets

public static class Costreams
{
    /// <summary>Runs the two specified processes in parallel, allowing one to generate data by writing it to a stream, and the other to consume the data by reading it from a stream.</summary>
    /// <param name="writingAction">An action that generates data and writes it to a stream.</param>
    /// <param name="readingAction">An action that will want to read information from a stream.</param>
    public static void RunCostreams(Action<Stream> writingAction, Action<Stream> readingAction)
    {
        // Everything the writingAction writes will be enqueued in here and dequeued by the readingAction
        var queue = new Queue<byteChunk>();
        using (var hasData = new ManualResetEvent(false))
        {
            writingCostream writer = new writingCostream(queue, hasData);
            readingCostream reader = new readingCostream(queue, hasData);

            // Start reading in a new thread. The first call to reader.Read() will block until there is something in the queue to read.
            var thread = new Thread(() => readingAction(reader));
            thread.Start();

            // Start writing. Calls to writer.Write() will place the data in the queue and signal the reading thread.
            writingAction(writer);

            // Insert a null at the end of the queue to signal to the reader that this is where the data ends.
            lock(queue)
            {
                queue.Enqueue(null);
                hasData.Set();
            }

            // Wait for the reader to consume all the remaining data.
            thread.Join();
        }
    }

    private sealed class byteChunk
    {
        public byte[] Buffer;
        public int Offset;
        public int Count;
    }

    private sealed class readingCostream : Stream
    {
        private Queue<byteChunk> _queue;
        private ManualResetEvent _hasData;

        public readingCostream(Queue<byteChunk> queue, ManualResetEvent hasData)
        {
            _queue = queue;
            _hasData = hasData;
        }

        public override bool CanRead { get { return true; } }
        public override bool CanSeek { get { return false; } }
        public override bool CanWrite { get { return false; } }
        public override void Flush() { }
        public override long Length { get { throw new NotSupportedException(); } }
        public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
        public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
        public override void SetLength(long value) { throw new NotSupportedException(); }
        public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }

        public override int Read(byte[] buffer, int offset, int count)
        {
            // If there is no data waiting to be read, wait for it.
            _hasData.WaitOne();

     

Context

StackExchange Code Review Q#956, answer score: 5

Revisions (0)

No revisions yet.