Καλώς ορίσατε στο dotNETZone.gr - Σύνδεση | Εγγραφή | Βοήθεια
σε

 

Αρχική σελίδα Ιστολόγια Συζητήσεις Εκθέσεις Φωτογραφιών Αρχειοθήκες

Το SQLite, ένα Job Queue, η F# και οι Agents και πως φτιάχνουμε τη Scan?

Îåêßíçóå áðü ôï ìÝëïò Παναγιώτης Καναβός. Τελευταία δημοσίευση από το μέλος Παναγιώτης Καναβός στις 04-10-2011, 10:19. Υπάρχουν 0 απαντήσεις.
Ταξινόμηση Δημοσιεύσεων: Προηγούμενο Επόμενο
  •  04-10-2011, 10:19 67620

    Το SQLite, ένα Job Queue, η F# και οι Agents και πως φτιάχνουμε τη Scan?

    Χρησιμοποιώ το SQLite μέσω Castle.ActiveRecord ως ultra-lightweight embedded βάση σε μία dropbox-οειδή εφαρμογή και συνάντησα ένα ενδιαφέρον πρόβλημα. Το SQLite κλειδώνει ολόκληρο το αρχείο, δηλαδή ολόκληρη τη βάση, όταν κάποιος γράφει. Επιπλέον, αντί να μπλοκάρει η εγγραφή μέχρι να ελευθερωθεί, ρίχνει exception (αυτό βέβαια μπορεί να είναι πρόβλημα του driver ή του Castle.ActiveRecord). Ακόμα και αν δεν έριχνε exception όμως η καθυστέρηση που θα προκαλούσε στα x threads που ήθελαν να γράψουν θα ήταν καταστροφική.

    Σκέφτηκα αρχικά να φτιάξω ένα Job Queue, μετά από όλες τις συζητήσεις που έχουν γίνει πρόσφατα για functions, lambdas, γιατί το Sharepoint δέχεται lambdas αντί για ονόματα properties. Έφτιαξα λοιπόν ένα BlockingCollection<Action> στο οποίο από τη μία πλευρά βάζω lambdas για εκτέλεση και από την άλλη έχω ένα thread το οποίο τραβάει τα actions ένα-ένα και τα εκτελεί. Το JobQueue είναι εντελώς γενικό, απλά το χρησιμοποίησα καταρχήν για να εκτελώ db actions:
        public class JobQueue
        {
            private readonly BlockingCollection<Action> _actionQueue = new BlockingCollection<Action>();
            private CancellationToken _cancellationToken;
            
    
            public void Start(CancellationToken token)
            {
                _cancellationToken = token;
                Task.Factory.StartNew(ProcessActions, _cancellationToken);
            }
    
            private void ProcessActions()
            {
                foreach (var action in _actionQueue.GetConsumingEnumerable())
                {
                    action();
                }
            }
    
            public void Add(Action action)
            {
                _actionQueue.Add(action);
            }
    
            public void Stop()
            {
                _actionQueue.CompleteAdding();
            }
           
        }   
    Και η η προσθήκη ενός νέου action γίνεται ως εξής:
    myJobQueue.Post(() =>
    {
            var filePath = path.ToLower();
            var state = FileState.Queryable.First(s=> s.FilePath == filePath);
            state.Status=FileStatus.Pending;
            state.Save();
    });
    Έτσι μπορώ να εκτελέσω άμεσα και ασύγχρονα όλα τα db actions, ενώ είμαι σίγουρος ότι θα εκτελεστούν ένα τη φορά με τη σειρά που τα πρόσθεσα. 

    Μετά όμως άρχισε η κουβέντα με το ασύγχρονο Retry και τα ασύγχρονα Sockets και ο Παλλαδινός με έπρηξε να μου μιλάει συνέχεια για τα agents της F#. Όποιος ασχολείται με Scala θα τα ξέρει ως actors. Στην ουσία είναι ο παρακάτω εξής απλός κώδικας:
    type Agent<'T> = MailboxProcessor<'T>
     
    let agent =
       Agent.Start(fun inbox ->
         async { while true do
                   let! msg = inbox.Receive()
                   printfn "got message '%s'" msg } )

    Κάνουμε post τα μηνύματα που θέλουμε να εκτελεστούν στον agent και αυτός τα τραβάει σε ένα ξεχωριστό thread και τα εκτελεί. 

    Η ιδέα μου φάνηκε πολύ πιο απλή και γενική από το Job Queue και έτσι δοκίμασα να φτιάξω κι εγώ κάτι αντίστοιχο σε C#. Η κλάση είναι η παρακάτω:
     public class Agent<TMessage> : IDisposable
        {
            private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
            private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
            public CancellationToken CancellationToken;
    
            private readonly Action<Agent<TMessage>> _process;
    
    
            public Agent(Action<Agent<TMessage>> action)
            {
                _process = action;
                CancellationToken = _cancelSource.Token;
            }
    
            public void Post(TMessage message)
            {
                _messages.Add(message);
            }
    
            public Task<TMessage> Receive(int timeout = -1)
            {
                return Task<TMessage>.Factory.StartNew(() =>
                {
                    TMessage item;
                    if (!_messages.TryTake(out item, timeout, CancellationToken))
                        throw new TimeoutException();
                    return item;
                });
            }
    
            public Task<TMessage> TryReceive(int timeout = -1)
            {
                return Task<TMessage>.Factory.StartNew(() =>
                {
                    TMessage item;
                    _messages.TryTake(out item, timeout, CancellationToken);
                    return item;
                });
            }
    
    
    
    
            public void Start()
            {
                Task.Factory.StartNew(() => _process(this), CancellationToken);
            }
    
    
    
            public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
            {
                var agent = new Agent<TMessage>(action);
                agent.Start();
                return agent;
            }
    
            public void Stop()
            {
                _messages.CompleteAdding();
                _cancelSource.Cancel();
            }
    
            public void DoAsync(Action action)
            {
                Task.Factory.StartNew(action, CancellationToken);
            }
    
    
            ~Agent()
            {
                Dispose(false);
            }
    
            public void Dispose()
            {
                Dispose(true);
                GC.SuppressFinalize(this);
            }
    
            protected void Dispose(bool disposing)
            {
                if (disposing)
                {
                    Stop();
                    _messages.Dispose();
                    _cancelSource.Dispose();
                }
            }
    
            public void AddFromEnumerable(IEnumerable<TMessage> enumerable)
            {
                foreach (var message in enumerable)
                {
                    Post(message);
                }
            }
    
            public IEnumerable<TMessage> GetEnumerable()
            {
                return _messages;
            }
        }

    Η δημιουργία ενός νέου agent γίνεται ως εξής:
    _agent = Agent<MyMessage>.Start(inbox =>
                {
                    Action loop = null;
                    loop = () =>
                    {
                        var messageTask = inbox.Receive();
                        var process = messageTask.ContinueWith(t =>
                        {
                            var message = t.Result;
                            //Do stuff with my message here
    
                            inbox.DoAsync(loop);
                        });
                    };
                    loop();
                });
    ενώ η κλήση γίνεται με ένα απλό _agent.Post(someMessage);
    Προφανώς το TMessage μπορεί να είναι οτιδήποτε, ένα data structure ή ένα Action. Το ωραίο είναι ότι μπορώ να δημιουργήσω ένα agent πολύ εύκολα ακριβώς στο σημείο που τον χρειάζομαι, και να του πω ακριβώς πως θέλω να επεξεργαστεί τα μηνύματα. Με το JobQueue ήμουν περιορισμένος σε ένα είδος υλοποίησης και μόνο. 

    Από την άλλη, η σύνταξη που αναγκάστηκα να χρησιμοποιήσω είναι αρκετά πιο στριφνή σε σχέση με την F#. Ο κώδικας με τα bold δείχνει το "θόρυβο" που αναγκάστηκα να προσθέσω για να έχω το αποτέλεσμα που θέλω:
    1. Για να μπορέσω να εκτελέσω recursion χρειάστηκε να ορίσω το loop πριν του δώσω τιμή. Διαφορετικά ο compiler αρνείται να χρησιμοποιήσει την loop γιατί θεωρεί ότι είναι undefined.
    2. Ο κώδικας χρησιμοποιεί tasks με αποτέλεσμα να γεμίζει ContinueWith και t.Result τα οποία κάνουν τον κώδικα πιο δύσκολο στην ανάγνωση
    3. Η DoAsync κρύβει ένα StartNew που χρησιμοποιεί το CancellationSource ολόκληρου του agent. Αυτό γίνεται γιατί θέλω να σταματήσει η επεργασία τόσο των μηνυμάτων όσο και του loop όταν κάνω Stop.
    Φυσικά τα πράγματα θα γίνουν πολύ ευκολότερα με την C# 5 και τα async, await keywords. Ο θόρυβος προστέθηκε γιατί είμαι αναγκασμένος να χειριστώ τα tasks και το cancellation με το χέρι, ενώ η C# θα τα αναλάβει "μαγικά" όπως κάνει τώρα και η F#.

    Ακόμα δεν έχω υλοποιήσει όλες τις μεθόδους του MailBoxProcessor, της κλάσης στην οποία βασίζονται οι F# agents. Συγκεκριμένα, η χρήση BlockingCollection σημαίνει ότι δεν μπορώ να υλοποιήσω εύκολα την Scan. Κανένα από τα concurrent collections δεν περιέχει κάποιο Remove function, ενώ η Scan αφαιρεί από το queue τα μηνύματα που βρίσκει.

    Καμμία ιδέα για να βοηθήσει κανείς?

    Παναγιώτης Καναβός, Freelancer
    Twitter: http://www.twitter.com/pkanavos
    Δημοσίευση στην κατηγορία: , , , , ,
Προβολή Τροφοδοσίας RSS με μορφή XML
Με χρήση του Community Server (Commercial Edition), από την Telligent Systems