lundi 1 novembre 2010

Sync C# - Multi-Producer Multi-Consumer Queue

L'article "Sync C# - ProducerConsumerQueue" décrivait et implémentait le pattern ProducerConsumerQueue (lecture recommandée pour la compréhension de cet article).

L'article ici présent est un complément qui met en place une version plus avancée du pattern où il est possible:
  • D'avoir plusieurs producer threads (soit plusieurs threads ajoutant des tâches dans la queue de traitement).
  • Mais également d'avoir plusieurs consumer threads (soit plusieurs worker threads traitant les tâches en attente dans la queue de traitement).
Exemple - Multiple Producer, Multiple Consumer
Le fichier Threading_MultiProducerMultiConsumerQueue.cs contient un exemple où la queue (de la classe ProducerConsumerQueue) est alimentée par plusieurs threads et où les tâches traitées par plusieurs worker threads.
Le nombre de worker threads (consumer) étant indiqué en paramètre lors de la création de la classe ProducerConsumerQueue.

Quelques-mots d'explication:
  • Tout comme dans le précédent article, lorsque qu'un worker thread (consumer) n'a plus de tâche à traiter dans la queue, il passe en attente de signal/évènement.
  • Tout comme dans le précédent article, un WaitHandle est utilisé pour signaler qu'une tâche est en attente de traitement. Ce WaitHandle se nomme whNewTaskAvailable.
    Comme c'est un AutoResetEvent, un seul des worker threads en attente est alerté. Si une deuxième tache arrive rapidement dans la queue, le signal est relancé et sera (plus que probablement) pris en charge par un autre worker thread en attente.
  • Tout comme le précédent article, empiler une tâche NULL permet informe via la queue que le traitement est terminé.
    Cependant, il y a ici une différence notable:
    • A) Le worker thread qui recoit la tache NULL lève le signal whExitWorkerThreads et termine son exécution.
    • B) Les autres workers threads en attente de signal et recevant le signal whExitWorkerThreads terminent leur processus. 
  • Un ManualResetEvent (WaitHandle) est utiliser pour signaler aux worker threads d'arrêter le processus (et le traitement de la queue).
    Il s'agit ici d'un Reset Event de type manuel car tous les workers threads doivent pouvoir être signalés.
    En effet, un AutoResetEvent n'aurait permis de signaler qu'un seul worker thread à la fois (puisqu'il est resetté automatiquement à chaque fois).
  • Les workers threads utilisent l'instruction WaitHandle.WaitAny pour détecter le signal/évènement whNewTaskAvailable ou whExitWorkerThreads.

Code du ProducerConsumerQueue
Le code complet du snippet est disponible dans le fichier Threading_MultiProducerMultiConsumerQueue.cs
public class ProducerConsumerQueue : IDisposable {
    // Set true to see additional synchronization message
    const bool _SHOW_DEBUG_ = true;
    // Tasks queue, enqueue a NULL to exit worker thread
    Queue<string> tasks = new Queue<string>(); 
    // Will signal worker thread that new Tasks are present.
    //   AutoResetEvent because only one thread can take the task
    EventWaitHandle whNewTaskAvailable = new AutoResetEvent(false);
    // Will signal the woker Threads to terminate the execution
    //   All threads must be signaled
    EventWaitHandle whExitWorkerThreads = new ManualResetEvent(false);
    Object locker = new Object();
    List<Thread> workers = new List<Thread>();
    
    /// <summary>
    /// protect CTor without arguments
    /// </summary>
    private ProducerConsumerQueue() 
    {
    }
    
    /// <summary>
    /// CTor. Start the worker thread
    /// </summary>
    /// <param name="NbrWorkerThreads">Number of consumer threads to starts</param>
    public ProducerConsumerQueue( int NbrWorkerThreads ) {
        if( NbrWorkerThreads <= 0 )
            throw new ArgumentOutOfRangeException("NbrWorkerThreads", NbrWorkerThreads, "Must be greater than 0" );
        foreach( int i in Enumerable.Range( 0, NbrWorkerThreads )) {
            if( _SHOW_DEBUG_ )
                Console.WriteLine( String.Format( "Create Worker #{0}", i+1) );
            Thread worker = new Thread( DoWork );
            workers.Add( worker );
            worker.Start();
        }
    }
    
    /// <summary>
    /// Enqueue some work
    /// </summary>
    /// <param name="sTaskToPrint"></param>
    public void EnqueueTask( string sTaskToPrint ){
        lock( locker ) {
            tasks.Enqueue( sTaskToPrint );
            whNewTaskAvailable.Set(); // if worker was blocking (because no more work), signal that new work arrived.
        }    
    }
    
    /// <summary>
    /// Enqueue a NULL to inform the worker threads to Exit
    /// The whExitWorkerThreads is not signaled here, this ensure all the queued tasks
    /// to be processed 
    /// </summary>
    public void EnqueueExitThread() {
        EnqueueTask( null );
    }
    
    /// <summary>
    /// Dispose the Object
    /// </summary>
    public void Dispose() {
        // Signal the consumer to exit
        EnqueueExitThread();
        // Wait workers threads to finish
        foreach( Thread worker in workers )
            worker.Join();
        // Release OS ressource
        whNewTaskAvailable.Close();
        whExitWorkerThreads.Close();
    }
    
    /// <summary>
    /// Method that implement the Consummer
    /// </summary>
    public void DoWork(){
        while(true) {
            string task = null;
            bool waitSignalToContinue = false; // Consumer waits for signal (new item added) to continue to consume
            
            // Dequeue the task
            lock( locker ) {
                if( tasks.Count == 0 ) {
                    // No more work, thread should wait
                    waitSignalToContinue = true;
                }
                else {
                    // Make a copy of the task
                    string temp = tasks.Dequeue();
                    if( temp != null ) task = String.Copy( temp  );
                
                    // Null is used to signal Exit Worker Thread
                    if( task == null ) {
                        if( _SHOW_DEBUG_ )
                            Console.WriteLine( String.Format("Received null task, current thread {0} set ExitWorkerThread signal and terminates", Thread.CurrentThread.GetHashCode()) );
                        // Signal ALL the worker threads to exit now;
                        whExitWorkerThreads.Set();
                        return; // Exit current thread
                    }
                }
                            
            }

            // Wait for signal must be performed OUTSIDE the lock !
            if( waitSignalToContinue ) {
                // if no more task in the list 
                //   --> Block the thread and wait signal for new job to do    (or Exit thread)
                if( _SHOW_DEBUG_ )
                    Console.WriteLine( String.Format( "Thread {0} WaitAny", Thread.CurrentThread.GetHashCode()) );
                int whIdx = WaitHandle.WaitAny( new WaitHandle[] { whNewTaskAvailable, whExitWorkerThreads } );
                switch ( whIdx ) {
                    case 0 : // whNewTaskAvailable
                        if( _SHOW_DEBUG_ )
                            Console.WriteLine( String.Format( "Thread {0} Received NewTaskAvailable signal", Thread.CurrentThread.GetHashCode()) );
                        // New task to execute -> continue thread execution
                        break;
                    case 1 : // whExitWorkerThreads
                        if( _SHOW_DEBUG_ )
                            Console.WriteLine( String.Format( "Thread {0} Received ExitWorkerThread signal", Thread.CurrentThread.GetHashCode()) );
                        // Exit the worker thread
                        return;
                    default:
                        throw new Exception( String.Format("Unexpected whIdx {0} value", whIdx ) );
                } 
            }

            
            // Execute the task
            //   - task may be null if the Queue is empty
            //   - An whNewTaskAvailable may also not be captured yet 
            if( task != null ){
                Console.WriteLine( String.Format( "Worker Thread {0} Performing task: {1}", Thread.CurrentThread.GetHashCode(), task) );
                Thread.Sleep( TimeSpan.FromMilliseconds(500) );
            }        

        }
    }
}

Aucun commentaire: