HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

AsyncTcpClient (Asynchronous TcpClient)

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
asynchronoustcpclientasynctcpclient

Problem

I've been doing network programming using C#'s TcpClient for several years. The code below is an asynchronous wrapper for TcpClient that I developed throughout these years.

The key methods are:

  • ConnectAsync() - connects asynchronously; RemoteServerInfo is a simple class containing Host, Port, and a boolean indicating whether this is an SSL connection.



  • StartReceiving() - initiates the data reading callbacks; this method is necessary to allow time for events to be hooked up before data starts being processed



  • DataReceivedCallback() - processes data as it is received and passes it on to any subscribed event handlers



  • SendAsync() - sends data asynchronously



Some other things to notice:

  • The code uses the old Asynchronous Programming Model for asynchrony.



  • There is some play with buffer sizes. The intention of this is to have adaptive buffer sizes - using a small amount of memory most of the time, but expanding to better cater for larger incoming data (up to a specified maximum) when necessary.



  • I am using a goto statement. This might send cold shivers down your spine, but I thought it was fine in this case. Read this answer if you're religious about never using goto in any situation whatsoever.



I would really appreciate code review from other developers (especially from network programmers) to see if this implementation can be improved further. Some things that come to mind include better performance, better use of TAP over APM, and any possible subtle bugs I might have missed.

Here is the code for AsyncTcpClient:

```
public class AsyncTcpClient : IDisposable
{
private bool disposed = false;
private TcpClient tcpClient;
private Stream stream;

private int minBufferSize = 8192;
private int maxBufferSize = 15 1024 1024;
private int bufferSize = 8192;

private int BufferSize
{
get
{
return this.bufferSize;
}
set
{
this.bufferSize = va

Solution

You should only use Task.Run to start threads, which you don't want to do when you're doing IO, at least directly. You should let the runtime make that decision. Also you need to make sure your tcpClient isn't already connected. There's also a tcpClient.ConnectAsync that retruns a Task so you should use that.

Also you should never pass a CancellationTokenSource to an async method, use CancellationToken. And the .NET async library is designed to throw an OperationCanceledException on cancelation so the task is marked canceled, so use that. It's also bad practice to dispose of a class from within itself, that can have some very undeseried effects, simply close and dispose of your tcpListener

So ConnectAsync could look like:

private async Task Close()
{
   await Task.Yield();
   if(this.tcpClient != null)
   {

      this.tcpClient.Close();
      this.tcpClient.Dispose();
      this.tcpClient = null;
   }
   if(this.stream != null)
   {
       this.stream.Dispose();
       this.stream = null;
   }
}
private async Task CloseIfCanceled(CancellationTeken token, Action onClosed = null)
{
    if(token.IsCancellationRequested)
    {
        await this.Close();
        if(onClosed != null)
           onClosed();
        token.ThrowIfCancellationRequested();
    }
}
public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationToken cancellationToken = default(CancellationToken))
{
    try
    {
        //Connect async method
        await this.Close();
        cancellationToken.ThrowIfCancellationRequested();
        this.tcpClient = new TcpClient();
        canellationToken.ThrowIfCancellationRequested();
        await this.tcpClient.ConnectAsync(remoteServerInfo.Host, remoteServerInfo.Port);
        await this.CloseIfCanceled(cancelationToken);
        // get stream and do SSL handshake if applicable

        this.stream = this.tcpClient.GetStream();
        await this.CloseIfCanceled(cancelationToken);
        if (remoteServerInfo.Ssl)
        {
            var sslStream = new SslStream(this.stream);
            sslStream.AuthenticateAsClient(remoteServerInfo.Host);
            this.stream = sslStream;
            await this.CloseIfCanceled(cancelationToken);
        }
    }
    catch(Exception)
    {
        this.CloseIfCanceled(cancelationToken).Wait();
        throw;
    }
}


There's also async methods on Stream that return Task's so, also your OnDisconected event was not thread safe, you need to assign it to an internal variable. You should also never pass null EventArgs. Also you can simplfiy BeginRecieve to just receive and put it in a loop with async/await and a cancellation token. Also I would remove the goto and replace with a do/while (btw I'm not 100% sure the reduce buffer size logic works, someone else might want to address that)

```
public async Task SendAsync(byte[] data, CancelationToken token = default(CancellationToken))
{
try
{
await this.stream.WriteAsync(data, 0, data.Length, token);
await this.stream.FlushAsync(token);
}
catch (IOException ex)
{
var onDisconected = this.OnDisconected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
; // ignore
else if (onDisconected != null)
onDisconected(this, EventArgs.Empty);
}
}
public async Task Recieve(CancelationToken token = default(CancellationToken))
{
try
{
if(!this.IsConnected || this.IsRecieving)
throw new InvalidOperationException();
this.IsRecieving = true;
byte[] buffer = new byte[bufferSize];
while(this.IsConnected)
{
token.ThrowIfCancellationRequested();
int bytesRead = await this.stream.ReadAsync(buffer, 0, buffer.Length, token);
if(bytesRead > 0)
{
if(bytesRead == buffer.Length)
this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
else
{
do
{
int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
if(bytesRead this.minBufferSize)
}
var onDataRecieved = this.OnDataRecieved;
if(onDataRecieved != null)
{
byte[] data = new byte[bytesRead];
Array.Copy(buffer, data, bytesRead);
onDataRecieved(this, data);
}
}
buffer = new byte[bufferSize];
}
}
catch(ObjectDisposedException){}
catch(IOException ex)
{
var evt = this.OnDisconnected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
;
if(evt != null)
evt(this, EventArgs.Empty);
}
finally
{

Code Snippets

private async Task Close()
{
   await Task.Yield();
   if(this.tcpClient != null)
   {

      this.tcpClient.Close();
      this.tcpClient.Dispose();
      this.tcpClient = null;
   }
   if(this.stream != null)
   {
       this.stream.Dispose();
       this.stream = null;
   }
}
private async Task CloseIfCanceled(CancellationTeken token, Action onClosed = null)
{
    if(token.IsCancellationRequested)
    {
        await this.Close();
        if(onClosed != null)
           onClosed();
        token.ThrowIfCancellationRequested();
    }
}
public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationToken cancellationToken = default(CancellationToken))
{
    try
    {
        //Connect async method
        await this.Close();
        cancellationToken.ThrowIfCancellationRequested();
        this.tcpClient = new TcpClient();
        canellationToken.ThrowIfCancellationRequested();
        await this.tcpClient.ConnectAsync(remoteServerInfo.Host, remoteServerInfo.Port);
        await this.CloseIfCanceled(cancelationToken);
        // get stream and do SSL handshake if applicable

        this.stream = this.tcpClient.GetStream();
        await this.CloseIfCanceled(cancelationToken);
        if (remoteServerInfo.Ssl)
        {
            var sslStream = new SslStream(this.stream);
            sslStream.AuthenticateAsClient(remoteServerInfo.Host);
            this.stream = sslStream;
            await this.CloseIfCanceled(cancelationToken);
        }
    }
    catch(Exception)
    {
        this.CloseIfCanceled(cancelationToken).Wait();
        throw;
    }
}
public async Task SendAsync(byte[] data, CancelationToken token = default(CancellationToken))
{
    try
    {
        await this.stream.WriteAsync(data, 0, data.Length, token);
        await this.stream.FlushAsync(token);
    }
    catch (IOException ex)
    {
         var onDisconected = this.OnDisconected;
         if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
            ; // ignore
        else if (onDisconected != null)
            onDisconected(this, EventArgs.Empty);
    }
}
public async Task Recieve(CancelationToken token = default(CancellationToken))
{
   try
   {
         if(!this.IsConnected || this.IsRecieving)
             throw new InvalidOperationException();
         this.IsRecieving = true;
         byte[] buffer = new byte[bufferSize];
         while(this.IsConnected)
         {
                  token.ThrowIfCancellationRequested();
                  int bytesRead = await this.stream.ReadAsync(buffer, 0, buffer.Length, token);
                  if(bytesRead > 0)
                  {
                       if(bytesRead == buffer.Length)
                          this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
                  else
                  {
                     do
                     {
                        int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
                        if(bytesRead < reducedBufferSize)
                           this.BufferSize = reducedBufferSize;

                     }
                     while(bytesRead > this.minBufferSize)
                  }
                  var onDataRecieved = this.OnDataRecieved;
                  if(onDataRecieved != null)
                  {
                     byte[] data = new byte[bytesRead];
                     Array.Copy(buffer, data, bytesRead);
                     onDataRecieved(this, data);
                  }
              }
              buffer = new byte[bufferSize];
         }
   }
   catch(ObjectDisposedException){}
   catch(IOException ex)
   {
       var evt = this.OnDisconnected; 
       if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
            ;
       if(evt != null)
            evt(this, EventArgs.Empty);
   }
   finally
   {
      this.IsRecieving = false;
   }
}

Context

StackExchange Code Review Q#82806, answer score: 9

Revisions (0)

No revisions yet.