mardi 9 novembre 2010

Sync C# - Wait And Pulse - ProducerConsumerQueue non bloquant

Qu'est-ce qu'une synchronisation bloquante
Par exemple, lorsque l'on protège une section de code comme par exemple l'accès a une variable utilisée par deux threads, ont met en place une synchronisation bloquante.Un thread ne peut absolument pas accéder à la section critique (protégée par la section du lock) pendant qu'un autre thread accède à la section. Le thread ne pouvant pas accéder à la section critique est bloqué (c'est d'ailleurs sont état interne).
Ce même cas de figure se présente également pour les threads partageant une même section critique, comme c'est le cas entre-autre pour les précédents exemples démontrant le fonctionnement interne d'une ProducerConsumerQueue (voir les articles Sync C# - ProducerConsumerQueue et Sync C# - Multi-Producer Multi-Consumer Queue).

Le thread bloqué n'utilise plus de ressource CPU mais peut également être retiré de la pile d'exécution par le scheduler. Cette perte de temps inhérent aux opérations de scheduling/descheduling (context switching) peut devenir trop important si l'on est face à un système nécessitant une forte réactivité.
C'est là qu'intervient Wait And Pulse.

Description du pattern Wait And Pulse
Wait And Pulse est une méthode de synchronisation non bloquante.
Wait And Pulse est un pattern permettant de partager une sections critique (lock) entre deux ou plusieurs threads tout en évitant le context switching (scheduling/deschéduling).
La section de lock est partagée (balancée) entre les threads à l'aide des instructions Monitor.Wait( LockingObject ) et Monitor.Pulse( LockingObject ).
Pour fonctionner correctement, le pattern doit être scrupuleusement suivit... et la section critique doit impérativement commencé par un Wait.
Losrque le wait est exécuté par un thread (le waiting thread) sur dans la section critique, le Framework libère le bloquage... ce qui permet à un autre thread d'acquérir (ou réacquérir) ce blocage jusqu'aà l'exécution d'un Pulse (lui c'est le pulsing thread).
Donc, en pratique, le pulsing thread effectue ses traitements et fait un pulse.
Lorsque le pulse est exécuté, le pulsing thread se bloque... libère le blocage pour le waiting thread.
Le waiting thread réacquière le lock et effectue ses traitements... une fois fait, il réexécute un wait, ce qui libère le lock pour le pulsing thread.
Le pulsing thread réacquière le lock et effectue ses traitements et puis fait un pulse... etc... etc... etc.

Dans cette description, on remarque nettement que le Waiting thread et le Pulsing thread travaillent tous les deux en tandem. Et l'exécution balance d'un thread à l'autre.
Si les explications manquent d'un peu de clarté, confrontez les au pseudo-code ci-dessous.

Pseudo-code du pattern
class WaitPulsedPattern {
    /* ... Déclaration privé variables protégées et partagées
      ... ex: une queue, un flag Go, une combinaison de flags, etc */
    private object locker = new object(); 

    public WaitPulsedPattern() { // CTor
         /* ... Démarrage des Waiter Thread (consumer)
           ... Exécute la méthode Worker()  */
    }
    
    public SomeProducer() {    
        /* ... méthode producer qui modifie la variable protégé
           ...    et fait le pulse */
    
        lock( locker ) {
            // ... modification des variables protégées
            Monitor.Pulse( locker ); 
        }
    }

    public void Worker() {
        while( true ) {
            lock( locker ) {
                while( ! Variables_Partagées_Indique_Du_Travail )
                    Monitor.Wait( locker );            
                CopieVariable = ... copier les informations du travail à faire ... 
            } // lock libéré  
            
            // Arrêter le thread de traitement?
            if( Conditions_D_arret )
                return;
            
            // ... effectuer le travail ...
        }
    }
}


Wait And Pulse ProducerConsumerQueue 
Voici une implémentation du pattern ProducerConsumerQueue (multi-consumer) utilisant Wait And Pulse.
Fichier: Threading_ProducerConsumerQueue_WaitPulse.cs

Quelques mots d'explication:
  • Les tâches sont empilées dans une queue (_taskQueue) protégé par l'objet de synchronisation _locker.
  • L'objet de synchronisation pour mettre en place la pattern Wait And Pulse.
  • Les threads de traitement (consumers) passe en Wait dès que la queue de traitement est vide.
  • Les threads de traitement terminent leurs processus dès qu'une tâche null est extraite de la queue (c'est l'exit condition des threads).
  • Les threads de traitement maintiennent le lock de façon aussi brève que possible.
    Il s'agit du temps entre l'acquisition lock (ou réacquisition du lock après un wait) ET le wait suivant ou la sortie du block de locking.
  • Les threads consummers (ceux qui font le traitement des tâches) sont créés dès l'instanciation de la classe. Cela assure toujours que le premier wait sera exécuté avant le premier pulse.
    Le premier thread acquière immédiatement le lock et exécute le wait (car la queue est vide).
    Ce qui à pour effet de relâcher le lock en attente d'un pulse dans le thread principal.
  • Le thread principal enqueue les tâches et utilise pulseAll pour signaler TOUS les consumers threads en attente.
  • Finalement, Dispose empile des nulls dans la queue (exit condition des consumer threads) et attends la fin de leur exécution.

using System;
using System.Collections.Generic;
using System.Threading;

public class WaitPulseProducerConsumerQueue : IDisposable
{
    private object _locker = new object();
    private Queue<string> _tasksQueue = new Queue<string>();
    private Thread[] consumerThreads;
    
    public WaitPulseProducerConsumerQueue( int ConsumerCount ) 
    {
        // Create and Fire Consumer Threads
        consumerThreads = new Thread[ConsumerCount];
        for( int i = 0; i < ConsumerCount; i++ )
            consumerThreads[i] = new Thread( new ThreadStart(ConsumeWorker) );
        // Wait/Pulse requires Wait to be executed first --> start the 
        //    threads now will ensure it.
        foreach( Thread th in consumerThreads )
            th.Start();
    }
    
    /// <summary>
    /// Consumer for the Queue. This will run into a thread
    /// </summary>
    public void ConsumeWorker() 
    {
        while( true ) {
            string task;
            lock( _locker ){
                // If the queue is empty --> wait (will release the lock)
                if( _tasksQueue.Count == 0 ) Monitor.Wait( _locker );
                // Dequeue task & release the lock
                task = _tasksQueue.Dequeue();
            }
            // Exit condition --> exit the thread
            if( task == null )
                return;
            // process the task
            Console.WriteLine( string.Format( "Processing task {0}", task ) );
            // Mimic processing
            Thread.Sleep( TimeSpan.FromSeconds( 1 ) );
        }
    }
    
    public void Dispose() {
        // Inform threads to exit
        foreach( Thread th in consumerThreads ) EnqueueThreadExit();
        // Wait all thread to ends
        foreach( Thread th in consumerThreads ) th.Join();
    }
    
    /// <summary>
    /// Enqueue data + pulse worker
    /// </summary>
    /// <param name="sTask"></param>
    public void Enqueue( string sTask ) {
        // Wait to acquire the lock
        lock( _locker ) {
            // Enqueue data
            _tasksQueue.Enqueue( sTask );
            // release lock to worker (until worker Waits)
            Monitor.PulseAll( _locker );
        }
    }
    
    private void EnqueueThreadExit() 
    {
        Enqueue( null );
    }
}
public class MyClass
{
    public static void RunSnippet()
    {
        const int NBR_CONSUMER_THREADS = 3;
        WaitPulseProducerConsumerQueue q = new WaitPulseProducerConsumerQueue( NBR_CONSUMER_THREADS );
        WL( "Enqueue data" );
        for( int i = 1; i<= 12; i++ )
            q.Enqueue( string.Format( "task {0}", i ) );
        
        WL( "Disposing queue (send exit signal + joining  consumer threads)" );
        q.Dispose(); // ensure worker threads Join
        
        WL( "Queue disposed" );
    }
    
    #region Helper methods
    ...
    #endregion
}

Résultats:
Enqueue data
Processing task task 1
Disposing queue (send exit signal + joining  consumer threads)
Processing task task 2
Processing task task 3
Processing task task 4
Processing task task 5
Processing task task 6
Processing task task 7
Processing task task 8
Processing task task 9
Processing task task 10
Processing task task 11
Processing task task 12
Queue disposed
Press any key to continue...

Aucun commentaire: