HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

A Stream wrapper that executes all writes asynchronously

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
streamexecuteswritesallwrapperthatasynchronously

Problem

I wanted to make a Stream that can wrap another stream and buffer all writes, to increase performance; comparable to the BufferedStream class.

However, while BufferedStream is strictly synchronous, and will block once its inner buffer is full, until the contents have been written to the underlying stream, this stream should never block on writes, and instead use a separate thread to execute the writes.

This buffer stream blocks only when .Flush() is called or the stream is disposed.

My main worries are also the .Flush() and .Dispose() methods, it seems easy to miss a corner case with multithreading there.

The whole thing is supposed to be thread safe, allowing simultaneous calls from multiple threads, although I suppose in most cases this would not make sense.

The whole thing is supposed to be called like this:

using (var stream = new AsyncBufferedStream(System.IO.File.Create(@"c:\xyz.dat")))
using (var writer = new StreamWriter(stream))
{
    // perform processor-bound work to get the next line to be written
    // then write the line
    writer.Write("abc");
}


(A short main program is at the end of the code)

```
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace xyz
{
/// Wrap a stream into an asynchronous buffer. All writes will be queued to be executed asynchronously
class AsyncBufferedStream : Stream
{
/// wrapped inner stream; we pass every write through to this stream
private Stream wrappedStream;

/// buffer, contains all items not yet written to the wrapped stream
private BlockingCollection blockBuffer = new BlockingCollection();

/// The Task that writes to the wrapped Stream
private Task writeTask;

private long countBytesInBuffer = 0;
private long countBlocksWritten = 0;

/// Wrap a stream into an asynch

Solution

The issue is the underlying dataset being tested.

Instead of just writing the same string again and again:

writer.WriteLine("AAAAAAA...");
for (int j = 0; j < 1000; j++)
{
    Math.Sin(Math.Cos(Math.Pow(j, j)));
}


Generate a random string, then write that data:

var testString = Enumerable.Range(0, 1000).Aggregate("", (s, x) => s + (char) random.Next(33, 126));
writer.WriteLine(testString);


Full implementation of Main():

private static void Main(string[] args)
{
    var random = new Random();

    Stopwatch watch = Stopwatch.StartNew();

    using (var stream = new AsyncBufferedStream(System.IO.File.Create(@"c:\temp\test.dat")))
    using (var writer = new StreamWriter(stream))
    {
        for (int i = 0; i  s + (char)random.Next(33, 126)));
        }
    }
    Console.WriteLine("buffered:   " + watch.ElapsedMilliseconds);

    watch = Stopwatch.StartNew();
    using (var stream = System.IO.File.Create(@"c:\temp\test.dat"))
    using (var writer = new StreamWriter(stream))
    {
        for (int i = 0; i  s + (char)random.Next(33, 126)));
        }
    }
    Console.WriteLine("unbuffered: " + watch.ElapsedMilliseconds);
    Console.ReadLine();
    return;
}


There seems to be low level optimization for repeating data like "AAAAAAAAAAAAAAAAA", which makes sense. It could literally be an optimization at the hardware level. You need to make sure the disk controller isn't just re-using the data it already has in cache.

On average the new class is performing 5x faster for me on randomly generated data.

Code Snippets

writer.WriteLine("AAAAAAA...");
for (int j = 0; j < 1000; j++)
{
    Math.Sin(Math.Cos(Math.Pow(j, j)));
}
var testString = Enumerable.Range(0, 1000).Aggregate("", (s, x) => s + (char) random.Next(33, 126));
writer.WriteLine(testString);
private static void Main(string[] args)
{
    var random = new Random();


    Stopwatch watch = Stopwatch.StartNew();

    using (var stream = new AsyncBufferedStream(System.IO.File.Create(@"c:\temp\test.dat")))
    using (var writer = new StreamWriter(stream))
    {
        for (int i = 0; i < 1000; i++)
        {
            writer.WriteLine(Enumerable.Range(0, 1000).Aggregate("", (s, x) => s + (char)random.Next(33, 126)));
        }
    }
    Console.WriteLine("buffered:   " + watch.ElapsedMilliseconds);

    watch = Stopwatch.StartNew();
    using (var stream = System.IO.File.Create(@"c:\temp\test.dat"))
    using (var writer = new StreamWriter(stream))
    {
        for (int i = 0; i < 10000; i++)
        {
            writer.WriteLine(Enumerable.Range(0, 1000).Aggregate("", (s, x) => s + (char)random.Next(33, 126)));
        }
    }
    Console.WriteLine("unbuffered: " + watch.ElapsedMilliseconds);
    Console.ReadLine();
    return;
}

Context

StackExchange Code Review Q#94119, answer score: 3

Revisions (0)

No revisions yet.