jeudi 28 octobre 2010

Sync C# - ProducerConsumerQueue

Préambule
L'article "Acknowledgement pattern (Ready/Go)" montrait un mécanisme de synchronisation permettant à deux threads (processus) de se synchroniser pour permettre la communication d'une tâche.
Pour rapperl, le Thread producer devant attendre le signal "ready" avant de présenter sa tâche au thread Consumer (worker). A son tour, le thread Consumer devait attendre le signal "go" pour utiliser la tâche et effectuer son processing.
Cette implémentation à cependant un terrible désavantage, le thread producer est obligé d'attendre la fin du processing par le thread consumer (le worker) avant de pouvoir présenter une nouvelle tâche... c'est terriblement bloquant (1*).
Le pattern ProducerConsumerQueue apporte une réponse à ce problème.

Description
Le pattern ProducerConsumerQueue est une très nette amélioration du pattern Acknowledgment.
Le ProducerConsumerQueue permet au thread producer (celui qui prépare les tâches) d'empiler les tâches dans une queue de traitement sans avoir a se soucier de l'état d'avancement du thread consumer (le worker thread qui traite les tâches).
Une telle structure permet également d'avoir:
  • Plusieurs threads producer (threads qui ajoutent les tâches).
    C'est très utile lorsque la phase de préparation des tâches est plus longue que la phase de traitement.
    Par exemple, l'interprétation d'un fichier (producer) vs Injection des données dans une BD (producer).
  • Plusieurs threads consumer (threads qui traitent les tâches) moyennant quelques adaptations du code.
    Très utile si le traitement ne consomme pas trop de ressources mais est néanmoins long.
    Par exemple, préparation de fichiers de commandes vs communication avec fournisseurssss à l'aide de modems.

Si le Framework .Net dispose effectivement d'une classe ProducerConsumerQueue, cet article abordera malgré tout le fonctionnement détaillé qu'une telle structure. Il est en effet plus simple par la suite d'en comprendre l'utilité.

Exemple - Single Producer, Single Consumer
Cet exemple montre l'implémentation d'un ProducerConsumerQueue dans la classe ProducerConsumerQueue.
  • La classe est Disposable (ce qui permet de libérer le worker thread).
  • La queue dans laquelle s'entassent les tâches est bien évidemment protégée par un lock (ce qui évite des races conditions durant l'empilement/dépilement). 
  • Le worker thread (consumer) utilise un AutoResetEvent pour être averti qu'il y a une tâche à traiter.
  • Le worker thread (consumer) continue le traitement tant qu'il y a une tâche dans la queue.
    Lorsque la queue est vide, le thread attend le signal de l'AutoResetEvent (ce qui indique qu'une nouvelle tâche vient d'être ajoutée).
Le snippet utilise la classe ProducerConsumerQueue pour en démontrer l'usage d'un ProducerConsumerQueue.

Fichier:  Threading_ProducerConsumerQueue.cs
using System;
using System.Collections.Generic;
using System.Threading;
using System.Linq;

public class ProducerConsumerQueue : IDisposable {
    Queue<string> tasks = new Queue<string>(); // Tasks queue, enqueue a NULL to exit worker thread
    EventWaitHandle wh = new AutoResetEvent(false); // Will signal worker thread that new Tasks are present.
    Object locker = new Object();
    Thread worker;
    
    /// <summary>
    /// CTor. Start the worker thread
    /// </summary>
    public ProducerConsumerQueue() {
        worker = new Thread( DoWork );
        worker.Start();
    }
    
    /// <summary>
    /// Enqueue some work
    /// </summary>
    /// <param name="sTaskToPrint"></param>
    public void EnqueueTask( string sTaskToPrint ){
        lock( locker ) {
            tasks.Enqueue( sTaskToPrint );
            wh.Set(); // if worker was blocking (because no more work), signal that new work arrived.
        }    
    }
    
    /// <summary>
    /// Enqueue a NULL to inform the worker thead to Exit
    /// </summary>
    public void EnqueueExitThread() {
        EnqueueTask( null );
    }
    
    /// <summary>
    /// Dispose the Object
    /// </summary>
    public void Dispose() {
        // Signal the consumer to exit
        EnqueueExitThread();
        // Wait worker thread to finish
        worker.Join();
        // Release OS ressource
        wh.Close();
    }
    
    /// <summary>
    /// Method that implement the Consummer
    /// </summary>
    public void DoWork(){
        while(true) {
            string task = null;
            bool waitSignalToContinue = false; // Consumer waits for signal (new item added) to continue to consume
            
            // Dequeue the task
            lock( locker ) {
                if( tasks.Count == 0 ) {
                    // No more work, thread should wait
                    waitSignalToContinue = true;
                }
                else {
                    // Make a copy of the task
                    string temp = tasks.Dequeue();
                    if( temp != null ) task = String.Copy( temp  );
                    
                    // Check exit condition (dequeue a NULL)
                    if( task == null )
                        return;
                }                            
            }
            

            // Wait for signal must be performed OUTSIDE the lock !
            if( waitSignalToContinue ) {
                // if no more task in the list 
                //   --> Block the thread and wait signal for new job to do                
                wh.WaitOne(); 
            }            
            
            // Execute the task
            //    task may be null if the Queue is empty
            if( task != null ){
                Console.WriteLine( "Performing task: "+ task );
                Thread.Sleep( TimeSpan.FromSeconds( 1 ) );
            }        

        }
    }
}

public class MyClass
{
    public static void RunSnippet()
    {
        // Using WL() seems to cause troubles
       WL( "Producer -- first Enqueue" );
        // Creating the Producer-Consumer Queue
        //   And proceed
        using( ProducerConsumerQueue q = new ProducerConsumerQueue() ){
            q.EnqueueTask( "Beginning" );
            foreach( int i in Enumerable.Range( 1, 6 ) )
                q.EnqueueTask( String.Format( "First Enqueue for {0}", i ) );
            q.EnqueueTask( "Pausing producer" );
            Thread.Sleep( TimeSpan.FromSeconds( 2 ));
            // WL( "Producer -- second Enqueue" );
            foreach( int i in Enumerable.Range( 25, 4 ) )
                q.EnqueueTask( String.Format( "Second Enqueue for {0}", i ) );
            q.EnqueueTask( "That's all folks" );
            WL( "Producer -- Enqueue finished" );
        } // The usage of USING will enforce q.Dispose (that join the worker thread)
        WL( "Producer -- ProducerConsuumerQueue disposed" );
    }
    
    #region Helper methods
    ...
    #endregion
}

Ce qui produit le résultat suivant:
Producer -- first Enqueue
Performing task: Beginning
Performing task: First Enqueue for 1
Producer -- Enqueue finished
Performing task: First Enqueue for 2
Performing task: First Enqueue for 3
Performing task: First Enqueue for 4
Performing task: First Enqueue for 5
Performing task: First Enqueue for 6
Performing task: Pausing producer
Performing task: Second Enqueue for 25
Performing task: Second Enqueue for 26
Performing task: Second Enqueue for 27
Performing task: Second Enqueue for 28
Performing task: That's all folks
Producer -- ProducerConsuumerQueue disposed
Press any key to continue...


Le vilain petit DeadLock
En travaillant sur une version MultiProducer de l'algorithme, je me suis aperçu qu'il était impossible de redémarrer une série de traitement lorsque le worker thread avait terminé sa première volée de tâche.
De même, quelques instructions d'affichage avant d'ajouter la première tâches dans la queue produisait le même effet (freeze).
Il y avait donc un bug dans cette première version.
Tout cela était dût à un DeadLock dans le worker dont le petit bout de code fautif est repris ci-dessous:
 public void DoWork(){
        while(true) {
            string task = null;
            // Dequeue the task
            lock( locker ) {
                if( tasks.Count == 0 ) {
                    // if no more task in the list 
                    //   --> Block the thread and wait signal for new job to do                
                    wh.WaitOne(); 
                }
                else {
                    // Make a copy of the task
                    string temp = tasks.Dequeue();
                    if( temp != null ) task = String.Copy( temp  );
En quelques mots:
  • S'il n'y a plus rien a traiter dans la queue, le worker thread passe en état bloqué (instruction WaitOne).
  • Mais cet état bloqué intervient alors qu'un lock est maintenu sur l'objet "locker".
    Le thread est dé-schédulé avec un lock actif :-/ (alors qu'on est pas dans un pattern WaitAndPulse où cela est possible).
  • Le DeadLock intervient lorsque l'on exécute la méthode ProducerConsumerQueue.EnqueueTask.
    En effet, pour pouvoir empiler la tâche en toute sécurité, la méthode à aussi besoin d'acquerir le lock sur l'objet "locker" (qui restera locké Advitam Eternam par le worker thread!).
Exemple - Multiple Producer, Single Consumer

Le fichier Threading_MultiProducerConsumerQueue.cs contient un exemple ou la queue de la classe ProducerConsumerQueue est alimentée par plusieurs threads.

Fichier: Threading_MultiProducerConsumerQueue.cs

Seule la section de code concernant le corps du snippet et les threads d'empilement des tâches sont repris a titre indicatif.

/// <summary>
/// Parametrization Data for EnqueueThread
/// </summary>
public class EnqueueThreadStartData {
    public  EnqueueThreadStartData ( ProducerConsumerQueue q, int startIndex ) : base() {
        Queue = q;
        StartIndex = startIndex;
    }
    
    public ProducerConsumerQueue Queue { get; set; }
    public int StartIndex { get; set; }
}

/// <summary>
/// Test class containing the snippet
/// </summary>
public class MyClass
{
    public static void RunSnippet()
    {
        ProducerConsumerQueue q = new ProducerConsumerQueue();
        
        // Enqueue data with threads
        Thread thEnqueue1 = new Thread( new ParameterizedThreadStart( DoWork_Enqueue ) );
        Thread thEnqueue2 = new Thread( new ParameterizedThreadStart( DoWork_Enqueue ) );
        
        thEnqueue1.Start( new EnqueueThreadStartData( q, 1000 ) );
        thEnqueue2.Start( new EnqueueThreadStartData( q, 2000 ) );
        thEnqueue1.Join();
        thEnqueue2.Join();
        WL( "Enqueue Finished");
        
        WL( "Press Enter to enqueue a new values" );
        RL();
        Thread thEnqueue3 = new Thread( new ParameterizedThreadStart( DoWork_Enqueue ) );
        thEnqueue3.Start( new EnqueueThreadStartData( q, 555 ) );
        
        // WARNING:
        //  if we omit to join, the q.Dispose will dispose the worker Thread (the consumer)
        //  and ressources while the thEnqueue3 (producer) is still adding data to the queue.
        //  THIS WOULD RESULT IN APPLICATION CRASH!
        thEnqueue3.Join();
        
        // Join worker thread and release resource
        q.Dispose(); 
    }
    
    private static void DoWork_Enqueue( object threadStartData ) {
        // cast data object
        EnqueueThreadStartData data = (EnqueueThreadStartData)threadStartData;
        
        WL( string.Format( "Enqueue Thread {0} started. Will enqueue values from index {1}", Thread.CurrentThread.GetHashCode(), data.StartIndex ) );
        
        // Enqueue values 
        foreach( int i in Enumerable.Range(  data.StartIndex, 25 )) {
            // Enqueueing is reentrant [thread safe] because of internal lock on the queue
            WL( string.Format( "Thread {0} Enqueue value {1}", Thread.CurrentThread.GetHashCode(), i ) );
              data.Queue.EnqueueTask( string.Format("task #{0}", i ) );
            // Stimulate context switching
            Thread.Sleep( TimeSpan.FromMilliseconds(50) );
        }
        WL( string.Format( "Enqueue Thread {0} finished", Thread.CurrentThread.GetHashCode() ) );
    }
    
    #region Helper methods 
    ...#endregion
}

Notes de bas de page
1*: Quel jeu de mot :-) ! 
C'est en effet bloquant d'un point de vue logiciel puisqu'il y a des moments d'attente manifeste dans le soft. Mais c'est également bloquant parce que la synchronisation est faite avec des AutoResetEvent qui sont des procédés de synchro dit "bloquant", le thread est dé-schédulé jusqu'au signalement de l'évènement.

Aucun commentaire: