patterncsharpMinor
AsyncTcpClient (Asynchronous TcpClient)
Viewed 0 times
asynchronoustcpclientasynctcpclient
Problem
I've been doing network programming using C#'s
The key methods are:
Some other things to notice:
I would really appreciate code review from other developers (especially from network programmers) to see if this implementation can be improved further. Some things that come to mind include better performance, better use of TAP over APM, and any possible subtle bugs I might have missed.
Here is the code for
```
public class AsyncTcpClient : IDisposable
{
private bool disposed = false;
private TcpClient tcpClient;
private Stream stream;
private int minBufferSize = 8192;
private int maxBufferSize = 15 1024 1024;
private int bufferSize = 8192;
private int BufferSize
{
get
{
return this.bufferSize;
}
set
{
this.bufferSize = va
TcpClient for several years. The code below is an asynchronous wrapper for TcpClient that I developed throughout these years.The key methods are:
ConnectAsync()- connects asynchronously;RemoteServerInfois a simple class containing Host, Port, and a boolean indicating whether this is an SSL connection.
StartReceiving()- initiates the data reading callbacks; this method is necessary to allow time for events to be hooked up before data starts being processed
DataReceivedCallback()- processes data as it is received and passes it on to any subscribed event handlers
SendAsync()- sends data asynchronously
Some other things to notice:
- The code uses the old Asynchronous Programming Model for asynchrony.
- There is some play with buffer sizes. The intention of this is to have adaptive buffer sizes - using a small amount of memory most of the time, but expanding to better cater for larger incoming data (up to a specified maximum) when necessary.
- I am using a
gotostatement. This might send cold shivers down your spine, but I thought it was fine in this case. Read this answer if you're religious about never usinggotoin any situation whatsoever.
I would really appreciate code review from other developers (especially from network programmers) to see if this implementation can be improved further. Some things that come to mind include better performance, better use of TAP over APM, and any possible subtle bugs I might have missed.
Here is the code for
AsyncTcpClient:```
public class AsyncTcpClient : IDisposable
{
private bool disposed = false;
private TcpClient tcpClient;
private Stream stream;
private int minBufferSize = 8192;
private int maxBufferSize = 15 1024 1024;
private int bufferSize = 8192;
private int BufferSize
{
get
{
return this.bufferSize;
}
set
{
this.bufferSize = va
Solution
You should only use Task.Run to start threads, which you don't want to do when you're doing IO, at least directly. You should let the runtime make that decision. Also you need to make sure your tcpClient isn't already connected. There's also a tcpClient.ConnectAsync that retruns a Task so you should use that.
Also you should never pass a CancellationTokenSource to an async method, use CancellationToken. And the .NET async library is designed to throw an OperationCanceledException on cancelation so the task is marked canceled, so use that. It's also bad practice to dispose of a class from within itself, that can have some very undeseried effects, simply close and dispose of your tcpListener
So ConnectAsync could look like:
There's also async methods on Stream that return Task's so, also your OnDisconected event was not thread safe, you need to assign it to an internal variable. You should also never pass null EventArgs. Also you can simplfiy BeginRecieve to just receive and put it in a loop with async/await and a cancellation token. Also I would remove the goto and replace with a do/while (btw I'm not 100% sure the reduce buffer size logic works, someone else might want to address that)
```
public async Task SendAsync(byte[] data, CancelationToken token = default(CancellationToken))
{
try
{
await this.stream.WriteAsync(data, 0, data.Length, token);
await this.stream.FlushAsync(token);
}
catch (IOException ex)
{
var onDisconected = this.OnDisconected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
; // ignore
else if (onDisconected != null)
onDisconected(this, EventArgs.Empty);
}
}
public async Task Recieve(CancelationToken token = default(CancellationToken))
{
try
{
if(!this.IsConnected || this.IsRecieving)
throw new InvalidOperationException();
this.IsRecieving = true;
byte[] buffer = new byte[bufferSize];
while(this.IsConnected)
{
token.ThrowIfCancellationRequested();
int bytesRead = await this.stream.ReadAsync(buffer, 0, buffer.Length, token);
if(bytesRead > 0)
{
if(bytesRead == buffer.Length)
this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
else
{
do
{
int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
if(bytesRead this.minBufferSize)
}
var onDataRecieved = this.OnDataRecieved;
if(onDataRecieved != null)
{
byte[] data = new byte[bytesRead];
Array.Copy(buffer, data, bytesRead);
onDataRecieved(this, data);
}
}
buffer = new byte[bufferSize];
}
}
catch(ObjectDisposedException){}
catch(IOException ex)
{
var evt = this.OnDisconnected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
;
if(evt != null)
evt(this, EventArgs.Empty);
}
finally
{
Also you should never pass a CancellationTokenSource to an async method, use CancellationToken. And the .NET async library is designed to throw an OperationCanceledException on cancelation so the task is marked canceled, so use that. It's also bad practice to dispose of a class from within itself, that can have some very undeseried effects, simply close and dispose of your tcpListener
So ConnectAsync could look like:
private async Task Close()
{
await Task.Yield();
if(this.tcpClient != null)
{
this.tcpClient.Close();
this.tcpClient.Dispose();
this.tcpClient = null;
}
if(this.stream != null)
{
this.stream.Dispose();
this.stream = null;
}
}
private async Task CloseIfCanceled(CancellationTeken token, Action onClosed = null)
{
if(token.IsCancellationRequested)
{
await this.Close();
if(onClosed != null)
onClosed();
token.ThrowIfCancellationRequested();
}
}
public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
//Connect async method
await this.Close();
cancellationToken.ThrowIfCancellationRequested();
this.tcpClient = new TcpClient();
canellationToken.ThrowIfCancellationRequested();
await this.tcpClient.ConnectAsync(remoteServerInfo.Host, remoteServerInfo.Port);
await this.CloseIfCanceled(cancelationToken);
// get stream and do SSL handshake if applicable
this.stream = this.tcpClient.GetStream();
await this.CloseIfCanceled(cancelationToken);
if (remoteServerInfo.Ssl)
{
var sslStream = new SslStream(this.stream);
sslStream.AuthenticateAsClient(remoteServerInfo.Host);
this.stream = sslStream;
await this.CloseIfCanceled(cancelationToken);
}
}
catch(Exception)
{
this.CloseIfCanceled(cancelationToken).Wait();
throw;
}
}There's also async methods on Stream that return Task's so, also your OnDisconected event was not thread safe, you need to assign it to an internal variable. You should also never pass null EventArgs. Also you can simplfiy BeginRecieve to just receive and put it in a loop with async/await and a cancellation token. Also I would remove the goto and replace with a do/while (btw I'm not 100% sure the reduce buffer size logic works, someone else might want to address that)
```
public async Task SendAsync(byte[] data, CancelationToken token = default(CancellationToken))
{
try
{
await this.stream.WriteAsync(data, 0, data.Length, token);
await this.stream.FlushAsync(token);
}
catch (IOException ex)
{
var onDisconected = this.OnDisconected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
; // ignore
else if (onDisconected != null)
onDisconected(this, EventArgs.Empty);
}
}
public async Task Recieve(CancelationToken token = default(CancellationToken))
{
try
{
if(!this.IsConnected || this.IsRecieving)
throw new InvalidOperationException();
this.IsRecieving = true;
byte[] buffer = new byte[bufferSize];
while(this.IsConnected)
{
token.ThrowIfCancellationRequested();
int bytesRead = await this.stream.ReadAsync(buffer, 0, buffer.Length, token);
if(bytesRead > 0)
{
if(bytesRead == buffer.Length)
this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
else
{
do
{
int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
if(bytesRead this.minBufferSize)
}
var onDataRecieved = this.OnDataRecieved;
if(onDataRecieved != null)
{
byte[] data = new byte[bytesRead];
Array.Copy(buffer, data, bytesRead);
onDataRecieved(this, data);
}
}
buffer = new byte[bufferSize];
}
}
catch(ObjectDisposedException){}
catch(IOException ex)
{
var evt = this.OnDisconnected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
;
if(evt != null)
evt(this, EventArgs.Empty);
}
finally
{
Code Snippets
private async Task Close()
{
await Task.Yield();
if(this.tcpClient != null)
{
this.tcpClient.Close();
this.tcpClient.Dispose();
this.tcpClient = null;
}
if(this.stream != null)
{
this.stream.Dispose();
this.stream = null;
}
}
private async Task CloseIfCanceled(CancellationTeken token, Action onClosed = null)
{
if(token.IsCancellationRequested)
{
await this.Close();
if(onClosed != null)
onClosed();
token.ThrowIfCancellationRequested();
}
}
public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
//Connect async method
await this.Close();
cancellationToken.ThrowIfCancellationRequested();
this.tcpClient = new TcpClient();
canellationToken.ThrowIfCancellationRequested();
await this.tcpClient.ConnectAsync(remoteServerInfo.Host, remoteServerInfo.Port);
await this.CloseIfCanceled(cancelationToken);
// get stream and do SSL handshake if applicable
this.stream = this.tcpClient.GetStream();
await this.CloseIfCanceled(cancelationToken);
if (remoteServerInfo.Ssl)
{
var sslStream = new SslStream(this.stream);
sslStream.AuthenticateAsClient(remoteServerInfo.Host);
this.stream = sslStream;
await this.CloseIfCanceled(cancelationToken);
}
}
catch(Exception)
{
this.CloseIfCanceled(cancelationToken).Wait();
throw;
}
}public async Task SendAsync(byte[] data, CancelationToken token = default(CancellationToken))
{
try
{
await this.stream.WriteAsync(data, 0, data.Length, token);
await this.stream.FlushAsync(token);
}
catch (IOException ex)
{
var onDisconected = this.OnDisconected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
; // ignore
else if (onDisconected != null)
onDisconected(this, EventArgs.Empty);
}
}
public async Task Recieve(CancelationToken token = default(CancellationToken))
{
try
{
if(!this.IsConnected || this.IsRecieving)
throw new InvalidOperationException();
this.IsRecieving = true;
byte[] buffer = new byte[bufferSize];
while(this.IsConnected)
{
token.ThrowIfCancellationRequested();
int bytesRead = await this.stream.ReadAsync(buffer, 0, buffer.Length, token);
if(bytesRead > 0)
{
if(bytesRead == buffer.Length)
this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
else
{
do
{
int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
if(bytesRead < reducedBufferSize)
this.BufferSize = reducedBufferSize;
}
while(bytesRead > this.minBufferSize)
}
var onDataRecieved = this.OnDataRecieved;
if(onDataRecieved != null)
{
byte[] data = new byte[bytesRead];
Array.Copy(buffer, data, bytesRead);
onDataRecieved(this, data);
}
}
buffer = new byte[bufferSize];
}
}
catch(ObjectDisposedException){}
catch(IOException ex)
{
var evt = this.OnDisconnected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
;
if(evt != null)
evt(this, EventArgs.Empty);
}
finally
{
this.IsRecieving = false;
}
}Context
StackExchange Code Review Q#82806, answer score: 9
Revisions (0)
No revisions yet.