Κάνεις ένα βασικό λάθος όταν προσπαθείς εσύ να ορίσεις πόσα task θα τρέξουν. Ένα task ΔΕΝ αντιστοιχεί σε ένα thread. To TPL παίρνει τα task που του δίνεις και τα εκτελεί σε δικά του threads τα οποία τραβάει από δικό του thread pool. Είναι δουλειά του TPL να δημιουργήσει τόσα threads όσα μπορεί να εκμεταλλευτεί το μηχάνημα.
Υποθέτω ότι αφού προσπαθείς να δημιουργήσεις συγκεκριμένο αριθμό tasks μάλλον τα χρησιμοποιείς σαν να είναι ξεχωριστά threads και προσπαθείς το καθένα να διαβάσει από ένα URL μέσα σε loop. Αυτό είναι το δεύτερο λάθος, καθώς ουσιαστικά αχρηστεύει τα tasks. Αντί για tasks που κάνουν ένα συγκεκριμένο πράγμα έχεις τώρα threads τα οποία προσπαθούν να κάνουν πολλά πράγματα μαζί. Λογικό είναι να είναι απαραίτητος αλλά και δύσκολος ο συγχρονισμός μεταξύ αυτών των threads. Ουσιαστικά με αυτό τον τρόπο έχεις αχρηστεύσει τα πλεονεκτήματα που σου δίνει η TPL.
Μία καλύτερη λύση θα ήταν να βάλεις στό BlockingCollection όλα τα URL προς επεξεργασία και κάθε ένα που το επεξεργάζεσαι να το βάζεις σε ένα ConcurrentDictionary. Το dictionary θα πρέπει να το ελέγεις πριν προσθέσεις ένα νέο URL στο queue ή όταν τραβάς ένα για να το επεξεργαστείς. Κάθε φορά πο βρίσκεις ένα νέο URL κατά την επεξεργασία, θα το προσθέτεις και αυτό στο queue. Ο κώδικας θα είναι κάπως έτσι:
var queue = new BlockingCollection<Uri>();
var visitedUris=new ConcurrentDictionary<Uri, Uri>();
Parallel.ForEach(queue.GetConsumingEnumerable(),
uri =>
{
//Skip if visited
if (visitedUris.ContainsKey(uri))
return;
using (var client = new WebClient())
{
var pageContent = client.DownloadString(uri);
var discovered = Process(pageContent);
var newUris = from newUri in discovered
where !visitedUris.ContainsKey(newUri)
select newUri;
foreach (var newUri in newUris)
{
queue.Add(newUri);
}
//Don't care if it fails
visitedUris[uri] = uri;
}
});
Με τον τρόπο αυτό αφήνεις το framework να δημιουργήσει όσα threads χρειάζεται αλλά μπορείς και να ελέγχεις εύκολα αν έχεις ήδη επεξεργασθεί μία διεύθυνση.
Εδώ τώρα μπορούν να γίνουν διάφορες τροποποιήσεις. Μπορείς να εκτελέσεις το download ασύγχρονα, εκμεταλλευόμενος την TaskFactory.FromAsync ή να χρησιμοποιήσεις την WebClient.DownloadStringTask από τα
ParallelExtensionExtras για να κατεβάσεις τις σελίδες ασύγχρονα. Μετά μπορείς να κάνεις την επεξεργασία σε ένα άλλο task με την Task.ContinueWith. Ο κώδικας θα γίνει κάπως έτσι:
Parallel.ForEach(queue.GetConsumingEnumerable(),
uri =>
{
//Skip if visited
if (visitedUris.ContainsKey(uri))
return;
var client = new WebClient();
var pageTask= client.DownloadStringTask(uri);
var processTask = pageTask.ContinueWith(p =>
{
client.Dispose();
var content = p.Result;
return Process(content);
});
processTask.ContinueWith(t=>
{
var discovered=t.Result;
var newUris = from newUri in discovered
where !visitedUris.ContainsKey(newUri)
select newUri;
foreach (var newUri in newUris)
{
queue.Add(newUri);
}
//Don't care if it fails
visitedUris[uri] = uri;
});
});
Το ωραίο σε όλη τη διαδικασία είναι ότι εσύ δεν χρειάζεται ποτέ να ορίσεις τον αριθμό των Task ή των Threads. Το αναλαμβάνει αυτό το framework.
Παναγιώτης Καναβός, Freelancer
Twitter: http://www.twitter.com/pkanavos