Καλώς ορίσατε στο dotNETZone.gr - Σύνδεση | Εγγραφή | Βοήθεια

 

Αρχική σελίδα Ιστολόγια Συζητήσεις Εκθέσεις Φωτογραφιών Αρχειοθήκες

Ασύγχρονα Sockets

  •  23-09-2011, 18:16

    Ασύγχρονα Sockets

    Κάποιος ρώτησε πρόσφατα πως μπορεί να χρησιμοποιήσει το Task Parallel Library για να φτιάξει ένα ασύγχρονο socket server ο οποίος θα μπορεί να εξυπηρετεί πολλούς clients. Προφανώς θα πρέπει να χρησιμοποιηθούν οι ασύγχρονες μορφές των συναρτήσεων του TcpListener (BeginAccept αντί για Accept) κλπ. Το πρόβλημα είναι ότι ο κώδικας που δημιουργείται είναι λίγο μακαρόνι. Έκανα μία προσπάθεια που περιγράφω παρακάτω αλλά θα ήθελα σχόλια και βελτιώσεις.

    Καταρχήν, ο παρακάτω κώδικας είναι η σύγχρονη μορφή του server. Ο TcpListener δέχεται συνδέσεις σε ένα loop και ό,τι παραλάβει το αποθηκεύει σε ένα αρχείο. Το αξιοσημείωτο είναι ότι αυτή η μορφή είναι πολύ γρήγορη έτσι και τρέχει στο δικό της thread, καθώς δεν υπάρχει καθόλου κόστος από context switching ή την δημιουργία tasks. Χρειάστηκε να στείλω αρκετά μεγάλα μηνύματα (αρκετά KB) πριν αρχίσω να βλέπω διαφορά ανάμεσα σε αυτή τη μορφή και τις ασύγχρονες. Σημειωτέον, χρησιμοποίησα μέχρι 64 client threads τη φορά. Αν ο αριθμός client είναι πολύ μεγάλος, κάποιοι από αυτούς θα αρχίσουν να βλέπουν timeouts.
    public class TcpServer 
    {        
    	public void Start(int port)
    	{
    		var listener = new TcpListener(IPAddress.Any, port);
    		listener.Start();
    		Console.WriteLine("Server Started");
    
    		Accept(listener);
    	}
    
    	private void Accept(TcpListener listener)
    	{
    		while (true)
    		{
    			var client = listener.AcceptTcpClient();  //BLOCKING
    			Process(client);
    		}
    	}
    	
    
    	private static void Process(TcpClient client)
    	{
    		using (NetworkStream stream = client.GetStream())
    		{
    			var fileName = Path.GetTempFileName();
    			
    			using (var fileStream = File.Open(fileName,FileMode.Create))
    			{
    				stream.CopyTo(fileStream);	//BLOCKING                    
    			}
    			File.Delete(fileName);
    		}
    		client.Close();
    	}
    }
     O server ξεκινάει φυσικά με ένα Task.Factory.StartNew(()=>myServer.Start(12345)).

    Μία πρώτη βελτίωση είναι η επεξεργασία να γίνει ασύγχρονα. Αυτό γίνεται απλά τρέχοντας την Process στο δικό της task.
    private void Accept(TcpListener listener)
    {
        while (true)
        {
            var client = listener.AcceptTcpClient();
            Task.Factory.StartNew(() => Process(client));
        }
    }
    Επόμενο βήμα είναι να γίνεται ασύγχρονα και η αποδοχή των συνδέσεων, και εδώ είναι το μανίκι. Με την TaskFactory.FromAsync() μπορώ να φτιάξω ένα task από τις BeginAcceptClient, EndAcceptClient. Μόλις τελειώσει αυτό το task όμως θα πρέπει να δημιουργήσω άλλο ένα παρόμοιο για να παραλάβει την επόμενη σύνδεση. Δεν μπορώ να βάλω την FromAsync στο loop γιατί θα δημιουργηθούν άπειρα task. Πρέπει αναγκαστικά να δημιουργήσω το νέο task αφού ολοκληρωθεί το προηγούμενο. Και φυσικά ένα task που έχει ολοκληρωθεί δεν ξαναξεκινάει, για να πεις ότι θα έβαζα ένα restart στο Continue.

    Μία λύση είναι το recursion που πρότεινε ο Νίκος ο Παλλαδινός στη συζήτηση για το Ασύγχρονο Retry. Η Accept ξαναγράφεται ως εξής:
    private Task Accept(TcpListener listener)
    {
        return Task.Factory
            .FromAsync<TcpClient>(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient, listener)
            .ContinueWith(tc =>{
                Accept((TcpListener)tc.AsyncState);
                Process(tc.Result);
            });
    }
    Μόλις έρθει μία σύνδεση, καλείται ξανά η Accept για να παραλάβει την επόμενη και το Process γίνεται και αυτό στο δικό του ξεχωριστό task.

    Σειρά έχει να ξεφορτωθούμε και τα μπλοκαρίσματα κατά την αποθήκευση στα αρχεία. Και εδώ μπορούμε να δουλέψουμε ασύγχρονα όπως παραπάνω, χρησιμοποιώντας τις BeginRead/EndRead κλπ μαζί με την TaskFactory.FromAsync. Ή μπορούμε να κλέψουμε και να χρησιμοποιήσουμε τα stream extensions από τα ParallelExtensionsExtras, τα οποία έχουν έτοιμες ασύγχρονες μορφές για τις Stream.CopyTo, Stream.Read κλπ.
    private static Task Process(TcpClient client)
    {
    	NetworkStream stream = client.GetStream();
    	var fileName = Path.GetTempFileName();
    
    	return stream
    		.CopyStreamToFileAsync(fileName)
    	.ContinueWith(t =>{
    		var e = t.Exception;                                                                
    		stream.Close();
    		File.Delete(fileName);
    		if (e != null) throw e;
    	}, TaskContinuationOptions.ExecuteSynchronously) 
    	.ContinueWith(t2=>
    		client.Close());
    }       

    Όλος μαζί ο server γίνεται πλέον
    public class TcpServer 
    {        
    	public void Start(int port)
    	{
    		var listener = new TcpListener(IPAddress.Any, port);
    		listener.Start();
    		Console.WriteLine("Server Started");
    
    		Accept(listener);
    	}
    
    	private Task Accept(TcpListener listener)
    	{
    		return Task.Factory
    			.FromAsync<TcpClient>(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient, listener)
    			.ContinueWith(tc =>{
    				Accept((TcpListener)tc.AsyncState);
    				return ProcessAsync(tc.Result);
    			}).Unwrap();
    	}
    	
    
    	private static Task Process(TcpClient client)
    	{
    		NetworkStream stream = client.GetStream();
    		var fileName = Path.GetTempFileName();
    
    		return stream
    			.CopyStreamToFileAsync(fileName)
    		.ContinueWith(t =>{
    			var e = t.Exception;                                                                
    			stream.Close();
    			File.Delete(fileName);
    			if (e != null) throw e;
    		}, TaskContinuationOptions.ExecuteSynchronously) 
    		.ContinueWith(t3=>
    			client.Close());
    	}       
    }

    Τί γνώμη έχετε και πως θα μπορούσε να βελτιωθεί ο κώδικας? Πως θα μπορούσε να γίνει πιο καθαρός και πιο μαζεμένος? 

     Σκέφτηκα καταρχήν να χρησιμοποιήσω Iterator όπως στο retry αλλά δεν θα βόλευε καθώς θέλω τα δύο βήματα της διαδικασίας (accept, process) να μπορούν να εκτελούνται ανεξάρτητα. Με τον iterator θα έπρεπε να τελειώσει πρώτα το Process για να ξεκινήσει το επόμενο accept. Το παρακάτω δηλαδή δεν παίζει όπως θέλουμε:
    private IEnumerable<Task> AsyncIterator(TcpListener listener)
    {
    	while (true)
    	{
    		var acceptTask = Task.Factory
    			.FromAsync<TcpClient>(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient, listener);
    		yield return acceptTask;
    		var client = acceptTask.Result;
    		yield return Process(client);
    	}
    }
    Καμμία καλύτερη ιδέα?

    Παναγιώτης Καναβός, Freelancer
    Twitter: http://www.twitter.com/pkanavos
    Δημοσίευση στην κατηγορία: , ,
Δείτε όλες τις δημοσιεύσεις της Θεματική Ενότητας
Με χρήση του Community Server (Commercial Edition), από την Telligent Systems