Καλώς ορίσατε στο dotNETZone.gr - Σύνδεση | Εγγραφή | Βοήθεια

Asyncrhonous programming in .NET Made Simple: From Delegates to the Task Parallel Library

A few people have been asking lately for samples of using asynchronous programming using Sockets or Pipes so I decided to put together some samples of the various asynchronous programming models using .NET. The scenario used in all the examples is the same:

  1. A server pipe is created and starts waiting for connections
  2. A client pipe is created and connects to the server pipe
  3. The server starts listening for data
  4. The client sends a UTF8 formatted number to the server asynchronously and waits for a response
  5. The server responds with “You sent x” to the client asynchronously
  6. The client reads the response and writes the response to the console.

The asynchronous version of a function (e.g. BeginRead, BeginWrite, BeginWaitForConnection) is used whenever one is available. All samples implement a simple interface to make running the samples easier:

public interface  IAsyncBase
{
        void StartServer();
        void StartClient();
}
IAsyncBase sample=new TaskSockets();
sample.StartServer();
sample.StartClient();

.NET 1.0 – Delegates

The first option is Delegates, available since good old .NET 1.0. Delegates made asynchronous programming easier than the Windows API by making it easier to use callbacks and removing some explicit thread handling concerns. Unfortunately, the resulting code is a bit verbose

   public class AsyncPipes : IAsyncBase
   {
       protected NamedPipeServerStream serverPipe = new NamedPipeServerStream("MyName", PipeDirection.InOut, 100, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
       protected NamedPipeClientStream clientPipe = new NamedPipeClientStream(".", "MyName", PipeDirection.InOut, PipeOptions.Asynchronous);
       byte[] clientBuffer = new byte[20];
       byte[] serverBuffer = new byte[20];

       public void StartServer()
       {
           serverPipe.BeginWaitForConnection(AfterServerConnect,null);
       }

       public void StartClient()
       {
           clientPipe.Connect();
           var output = Encoding.UTF8.GetBytes("5");
           clientPipe.BeginWrite(output, 0, output.Length, AfterClientWrite, null);
       }

       public void AfterClientWrite(IAsyncResult a)
       {
           clientPipe.EndWrite(a);
           clientPipe.BeginRead(clientBuffer, 0, clientBuffer.Length, AfterClientRead, null);
       }

       public void AfterServerConnect(IAsyncResult a)
       {
           serverPipe.EndWaitForConnection(a);
           serverPipe.BeginRead(serverBuffer, 0, serverBuffer.Length, AfterServerRead, null);
       }

       public void AfterServerRead(IAsyncResult b)
       {
           int count = serverPipe.EndRead(b);
           var input = Encoding.UTF8.GetString(serverBuffer, 0, count);
           string message = String.Format("You sent {0}", input);
           byte[] messageBytes = Encoding.UTF8.GetBytes(message);
           serverPipe.BeginWrite(messageBytes, 0, messageBytes.Length, AfterServerWrite, null);
       }

       public void AfterClientRead(IAsyncResult b)
       {
           int count = clientPipe.EndRead(b);
           string message = Encoding.UTF8.GetString(clientBuffer, 0, count);
           Console.WriteLine(message);
       }

       public void AfterServerWrite(IAsyncResult c)
       {
           serverPipe.EndWrite(c);
       }


   }

 

The code is long and hard to follow. A separate function is needed for each callback so the simple logic of the two endpoints (Connect, Read, Response) get scattered all over the class.

.NET 3.0 – Lambdas

Lambdas made things quite a bit easier in .NET 3.0  by allowing us to embed the callbacks:

 
public class AsyncPipesLambda : IAsyncBase
{
    protected NamedPipeServerStream serverPipe = new NamedPipeServerStream("MyName", PipeDirection.InOut, 100, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
    protected NamedPipeClientStream clientPipe = new NamedPipeClientStream(".", "MyName", PipeDirection.InOut, PipeOptions.Asynchronous);
    byte[] clientBuffer = new byte[20];
    byte[] serverBuffer = new byte[20];

    public void StartServer()
    {
        serverPipe.BeginWaitForConnection(a =>
        {
            serverPipe.EndWaitForConnection(a);
            serverPipe.BeginRead(serverBuffer, 0, serverBuffer.Length, b =>
                    {
                        int count = serverPipe.EndRead(b);
                        var input = Encoding.UTF8.GetString(serverBuffer, 0, count);
                        string message = String.Format("You sent {0}", input);
                        byte[] messageBytes = Encoding.UTF8.GetBytes(message);
                        serverPipe.BeginWrite(messageBytes, 0, messageBytes.Length,
                            c =>serverPipe.EndWrite(c), null);
                    }, null);
        }, null);
    }

    public void StartClient()
    {
        clientPipe.Connect();
        var output = Encoding.UTF8.GetBytes("5");
        clientPipe.BeginWrite(output, 0, output.Length, a =>
        {
            clientPipe.EndWrite(a);
            clientPipe.BeginRead(clientBuffer, 0, clientBuffer.Length, b =>
            {
                int count = clientPipe.EndRead(b);
                string message = Encoding.UTF8.GetString(clientBuffer, 0, count);
                Console.WriteLine(message);
            }, null);
        }
        , null);
    }
}

The code is now more concise, easier to read and write. The logic of the client and server are now clear. We still have to nest one callback into the other and that makes it hard to compose multiple asyncrhonous steps in one method.

.NET 4.0 – Task Parallel Library

That changes with .NET 4.0 and the Task Parallel Library which offers primitives for asynchronous programming as well as the parallel execution of tasks. Each asynchronous step can now be mapped to a task using the FromAsync method. Continuation from one step to another is expressed using ContinueWith. In fact, it is now possible to create extension methods for each asynchronous stream operation that make the asynchronous code look a lot more like normal code. The WriteAsync, ReadAsync methods in the following class come from the the ParallelExtensionsExtras library.

 

public class TaskPipes:IAsyncBase
{
    protected NamedPipeServerStream serverPipe = new NamedPipeServerStream("MyName", PipeDirection.InOut, 100, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
    protected NamedPipeClientStream clientPipe = new NamedPipeClientStream(".", "MyName", PipeDirection.InOut, PipeOptions.Asynchronous);
    byte[] clientInput = new byte[20];

    public void StartServer()
    {            
        Task.Factory.FromAsync(serverPipe.BeginWaitForConnection, serverPipe.EndWaitForConnection, serverPipe).
            ContinueWith(t =>
            {
                byte[] serverInput = new byte[20];
                serverPipe.ReadAsync(serverInput, 0, serverInput.Length)
                    .ContinueWith(rt =>
                    {
                        var input = Encoding.UTF8.GetString(serverInput, 0, rt.Result);
                        string message = String.Format("You sent {0}", input);
                        byte[] messageBytes = Encoding.UTF8.GetBytes(message);
                        serverPipe.WriteAsync(messageBytes, 0, messageBytes.Length);
                    });
            }
        );

    }


    public void StartClient()
    {
        Task.Factory.StartNew(() =>
        {
            clientPipe.Connect();
            var output = Encoding.UTF8.GetBytes("5");
            clientPipe.WriteAsync(output, 0, output.Length)
            .ContinueWith(_ => clientPipe.ReadAsync(clientInput,0,clientInput.Length)
                .ContinueWith(t=>
                {
                    string message = Encoding.UTF8.GetString(clientInput, 0, t.Result);
                    Console.WriteLine(message);
                }));
        });
    }

}

The TPL and ParallelExtensionsExtras offer more than just eliminating the Begin/EndXXX methods. It is now possible to create iterators over a step of tasks and even use LINQ on them. ParallelExtensionsExtras include a set of extension methods for streams (ReadAllBytesAsync, CopyStreamToStreamAsync etc) that make working with streams a lot easier by using iterators over tasks.

Using sockets instead of pipes is just as easy. Instead of a server and a client pipe, we use a TcpListener and TcpClient objects.:

 

 

public class TaskSockets:IAsyncBase
{
    public const int PORT = 10901;
    
    protected TcpListener server = new TcpListener(IPAddress.Any ,PORT);
    protected TcpClient client = new TcpClient();
    byte[] clientInput = new byte[20];

    public void StartServer()
    {            
        server.Start();
        Task.Factory.FromAsync<TcpClient>(server.BeginAcceptTcpClient, server.EndAcceptTcpClient, server).
            ContinueWith(t =>
            {
                byte[] serverInput = new byte[20];
                var stream=t.Result.GetStream();
                stream.ReadAsync(serverInput, 0, serverInput.Length) 
                    .ContinueWith(rt =>
                    {
                        var input = Encoding.UTF8.GetString(serverInput, 0, rt.Result);
                        string message = String.Format("You sent {0}", input);
                        byte[] messageBytes = Encoding.UTF8.GetBytes(message);
                        stream.WriteAsync(messageBytes, 0, messageBytes.Length);
                    });
            }
        );

    }


    public void StartClient()
    {
        Task.Factory.FromAsync(client.BeginConnect,client.EndConnect,"localhost",PORT,null)
            .ContinueWith(c=>
        {                
            var output = Encoding.UTF8.GetBytes("5");
            var stream = client.GetStream();
            stream.WriteAsync(output, 0, output.Length)
            .ContinueWith(_ => stream.ReadAsync(clientInput,0,clientInput.Length)
            .ContinueWith(t=>
                {
                    string message = Encoding.UTF8.GetString(clientInput, 0, t.Result);
                    Console.WriteLine(message);
                }));
        });
    }

}

Still in research – Reactive Extensions


The latest evolution in asynchronous programming comes with the Reactive Extensions for .NET which allow us to write asynchronous programs almost as if we were writing old-fashioned synchronous programs:

public class ReactivePipes:IAsyncBase
{
    protected NamedPipeServerStream serverPipe = new NamedPipeServerStream("MyName", PipeDirection.InOut, 100, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
    protected NamedPipeClientStream clientPipe = new NamedPipeClientStream(".", "MyName", PipeDirection.InOut, PipeOptions.Asynchronous);

    public void StartServer()
    {           
        
        var connect = Observable.FromAsyncPattern(serverPipe.BeginWaitForConnection, serverPipe.EndWaitForConnection);
        var read=Observable.FromAsyncPattern<byte[],int,int,int>(serverPipe.BeginRead,serverPipe.EndRead);
        var write=Observable.FromAsyncPattern<byte[],int,int>(serverPipe.BeginWrite,serverPipe.EndWrite);
        
        connect().Subscribe(u =>
            {
                byte[] msg = new byte[20];
                read(msg, 0, msg.Length).Subscribe(i =>
                {
                    var input =Encoding.UTF8.GetString(msg, 0,i);
                    string message =String.Format("You sent {0}",input);
                    byte[] buffer =Encoding.UTF8.GetBytes(message);
                    write(buffer, 0, buffer.Length);
                });
            });
    }


    public void StartClient()
    {
        var read = Observable.FromAsyncPattern<byte[], int, int, int>(clientPipe.BeginRead, serverPipe.EndRead);
        var write = Observable.FromAsyncPattern<byte[], int, int>(clientPipe.BeginWrite, serverPipe.EndWrite);

        clientPipe.Connect();
        var output = Encoding.UTF8.GetBytes("5");
        write(output, 0, output.Length).Subscribe(u =>
        {
            byte[] bytes = new byte[20];
            read(bytes, 0, bytes.Length).Subscribe(i =>
            {
                string message = Encoding.UTF8.GetString(bytes, 0, i);
                Console.WriteLine(message);
            });
        });
    }
}

 

This time, instead of defining a delegate or a task, we define an Observable (a source of events) and subscribe to it, passing the code we want executed when the event we want to observe occurs. In this case, we subscribe to the EndWrite, EndRead etc. events.

From a scattered mess of callbacks to code that almost reads like a normal function. Not bad, not bad at all.

P.S. Visual Studio has excellent support for parallel debugging, developed by a Greek guy and his team. So be sure to visit Daniel Moth's blog and thank him for an excellent job.

Έχουν δημοσιευτεί Παρασκευή, 2 Ιουλίου 2010 10:13 πμ από το μέλος Παναγιώτης Καναβός

Ενημέρωση για Σχόλια

Αν θα θέλατε να λαμβάνετε ένα e-mail όταν γίνονται ανανεώσεις στο περιεχόμενο αυτής της δημοσίευσης, παρακαλούμε γίνετε συνδρομητής εδώ

Παραμείνετε ενήμεροι στα τελευταία σχόλια με την χρήση του αγαπημένου σας RSS Aggregator και συνδρομή στη Τροφοδοσία RSS με σχόλια

Σχόλια:

Χωρίς Σχόλια

Ποιά είναι η άποψή σας για την παραπάνω δημοσίευση;

(απαιτούμενο) 
απαιτούμενο 
(απαιτούμενο) 
Εισάγετε τον κωδικό:
CAPTCHA Image