mardi 26 octobre 2010

Sync C# - Semaphore

Description du Sémaphore
Un sémaphore c'est un peu comme une boite de nuit. Elle a une certaine capacité et est gardée par des videurs.
Lorsque la boite de nuit est remplie, les videurs ne laisse plus entrer personne et une file se forme dehors.
Lorsque des personnes sortent, le un nombre identique de personnes peuvent entrer (ce qui diminue la file d'attente).
Cette métaphore représente parfaitement le fonctionnement d'un sémaphore.
Lorsque le sémaphore est créé, il faut indiquer la capacité totale et le nombre de place disponible.
Dans la pratique, un sémaphore décompte le nombre de places disponibles au fur qu'il les allouent (qu'il laisse rentrer les threads).
Ainsi, pour un sémaphore de capacité 3. Le nombre de place disponible est 3 si aucune des places n'a été demandée. Si deux threads demandent d'entrer dans le sémaphore (la boite de nuit) avec l'instruction WaitOne, ils rentrerons et le nombre de place disponible tombe à 1.
Si deux nouveaux threads demandent alors à  rentrer, le premier obtient son droit de passage et le sémaphore tombe à 0 place disponible. Le deuxième thread doit attendre à la porte qu'un des trois premier threads sorte du sémaphore (ce qui aura pour effet d'incrémenter le nbre de place disponible dans le sémaphore et d'autoriser une nouvelle entrée).

Cas d'utilisation de Sémaphore
Limitation de charge
ne pas autoriser plus d'un certains nombre d'exécutions en parallèle pour limiter la charge sur le système.
Par exemple traiter une série de fichiers (ex: 35) à l'aide de threads ThreadPool.QueueUserWorkItem (un par fichier) mais limiter le traitement concurrent à deux fichiers en même temps avec un sémaphore à deux places (car les I/O, ca coute cher en ressource).

Utiliser comme Mutex
Probablement inutile car la plateforme .Net dispose d'une classe Mutex.
Déclarer un Sémaphore avec une capacité de 1 fonctionnera comme un mutex.

Synchronisation des threads
Il est possible de synchroniser des tâches (threads) à l'aide d'un Sémaphore créé comme n'ayant plus de place disponible (mais une capacité de 1).
Ainsi, si le Thread 1 doit impérativement terminer sa tâche avant le démarrage du thread 2:
  1. Un sémaphore de capacité 1 et place disponible est créer.
  2. Thread 1 est démarré
  3. Thread 2 est démarré, sa première instruction est de vouloir entrer dans le sémaphore. 
Lorsque le thread 1 aura terminé sa tâche, il quitte le sémaphore.
Cela libère une place dans le sémaphore et permet au thread 2 d'y rentrer.
Thread 2 à donc bien attendu la fin de l'exécution de la première tâche.
 Ce procédé est appelé "Barrier" (barrière) et permet visiblement la synchronisation de plusieurs tâche.
Note:
Un résultat identique peut-être obtenu à l'aide de l'utilisation Manual/Auto ResetEvent avec deux cas de figures:
1) Soit avec un AutoResetEvent + ThreadPool.RegisterWaitForSingleObject.
2) Soit un AutoResetEvent + le thread 2 deja démarré qui fait un WaitOne sur l'AutoResetEvent


Exemple
Dans cet exemple, on utilise le threadpool à l'aide de Async Delegate Invocation pour démarrer 25 traitements différents.
Comme on utilise le ThreadPoll, plusieurs traitement seront exécutés en parallèle à la discrétion du ThreadPool lui même.
L'exemple utilise un sémaphore pour limiter le traitement en concurrence à deux instances de traitement.
Qu'il soit bien clair que cet exemple limite le traitement en parallèle et non le nombre de threads.
Par ailleurs, comme les threads en attentent sur le sémaphore utilisent l'instruction WaitOne (instruction dite bloquante/blocking), ces threads sont sujets au "context switch" et seront dé-schédulés... il ne consommeront donc pas inutilement du temps processeur.

Résultat sans usage du sémaphore
Sans usage du sémaphore, l'on constate que plusieurs threads entament leur traitement en même temps.
La première salve de traitement concurrent est marqué comme ceci.

Thread 4 is processing ressource MAN: Metropolitan Area Network
Thread 3 is processing ressource MACAO : Méthode d'analyse et de con
plications orientées objet
Thread 5 is processing ressource MAO : Musique assistée par ordinate
Thread 6 is processing ressource MAPI : Microsoft Messaging Applicat
ing Interface
Thread 7 is processing ressource Mb : mégabit
Thread 8 is processing ressource Mbit : mégabit
Thread 9 is processing ressource Mbit/s : mégabit par seconde
Thread 10 is processing ressource MBR : Master Boot Record
Thread 11 is processing ressource MBSA : Microsoft Baseline Security
Thread 4 finished
Thread 4 is processing ressource MCD : Modèle Conceptuel de Données
e MERISE)
Thread 3 finished

Résultat avec sémaphore
Par contre, avec le sémaphore activé, seul deux threads effectuent leur traitement en même temps.
La première salve de traitement concurrent est indiqué comme cecila seconde comme cela et la troisième comme ceci.
L'on constate aisément qu'il n'y a que deux traitements exécutés en parallèle.

Thread 4 is processing ressource MAN: Metropolitan Area Network
Thread 3 is processing ressource MACAO : Méthode d'analyse et de co
plications orientées objet
Thread 4 finished
Thread 4 is processing ressource MCD : Modèle Conceptuel de Données
e MERISE)
Thread 3 finished
Thread 6 is processing ressource MAPI : Microsoft Messaging Applica
ing Interface
Thread 3 did extract MACAO from MACAO : Méthode d'analyse et de con
lications orientées objet
Thread 4 did extract MAN from MAN: Metropolitan Area Network
Thread 4 finished
Thread 4 is processing ressource MD4 : Message Digest 4
Thread 6 finished
Thread 6 is processing ressource MD5 : Message Digest Version 5
Thread 4 finished
Thread 19 is processing ressource MDI : Multiple Document Interface
Thread 6 finished

Code Source
Fichier:  Threading_Semaphore.cs
using System;
using System.Collections.Generic;
using System.Threading;

/*
  Sample demonstrating usage of Semaphore.
  This sample will start 25 request on the threadpool to  a worker (to work on 25 different ressources) 
  but the coder did decided to run only 2 concurrent processing at the same time (to limits ressource consumption)
  whatever the number of threads is allocated.
  
  This limitation is taken in charge with a Semaphore.

  Note: This sample also use the Asynchronous Delegate Pattern to befenits from ThreadPoll
        http://domeu.blogspot.com/2010/09/threading-en-c-exemple-asynchronous.html
*/

/// <summary>
/// Declare the Worker delegate for Asynchronous Delegate invocation
/// </summary>
public delegate string Worker ( string name );

public class MyClass
{
    // switch it to false to see how the processing is handled without the Semaphore
    private const bool USE_SEMAPHORE = true; 
    // Semaphore Available=2, Capacity=2
    private static Semaphore ConcurrentProcessingSemaphore = new Semaphore( 2, 2 ); 
    // Sample data
    private static List<string> nameList = new List<string>() { 
        "MACAO : Méthode d'analyse et de conception d'applications orientées objet", "MAN: Metropolitan Area Network", "MAO : Musique assistée par ordinateur", "MAPI : Microsoft Messaging Application Programming Interface", "Mb : mégabit",
        "Mbit : mégabit", "Mbit/s : mégabit par seconde", "MBR : Master Boot Record", "MBSA : Microsoft Baseline Security Analyzer", "MCD : Modèle Conceptuel de Données (voir méthode MERISE)",     
        
        "MCGA : Multicolor Graphics Array", "MCH : Memory Controller Hub", "MCI : Multimedia Command Interface", "MCSE : Microsoft Certified Systems Engineer", "MCT : Microsoft Certified Trainer",
        "MD4 : Message Digest 4", "MD5 : Message Digest Version 5", "MDA : Model driven architecture", "MDF : Meta Data Framework (Cisco)", "mdk : Mandrakelinux",
        
        "MDI : Multiple Document Interface", "MDR : Mort De Rire", "MFLOPS : Million Floating Point Operations Per Second", "MFM : Modified Frequency Modulation",     "MH : Mail Handler"
    };
    
    public static void RunSnippet()
    {
        List<IAsyncResult> asyncResults= new List<IAsyncResult>();
        List<Worker> workers = new List<Worker>();
        
        //=== Start the Threads ====================
        // use threadpool for executing (with Asynchronous delegate).
        foreach( string sValue in nameList ) {
            // Instanciate the Delegate
            //    and point it to the target worker method
            Worker worker = new Worker( DoWork );
            IAsyncResult asyncResult = worker.BeginInvoke( sValue, null, null );
            asyncResults.Add( asyncResult );
            workers.Add( worker );
        }
        
        //=== Doing some processing ================
        // ....
        
        //=== Wait for the results =================
        for( int i = 0; i<asyncResults.Count; i++ ){
            // Retreive Asynch references
            Worker worker = workers[i];
            IAsyncResult asyncResult = asyncResults[i];
            string result = string.Empty;
            // EndInvoke
            result = worker.EndInvoke( asyncResult );
            // Display result
            WL( result );
        }
    }
    
    public static string DoWork( string name ){
        if( USE_SEMAPHORE )
            ConcurrentProcessingSemaphore.WaitOne();
        try {
            WL( string.Format("Thread {0} is processing ressource {1}", Thread.CurrentThread.GetHashCode(), name) ); 
            // Using long delay will cause the ThreadPool 
            // to allocate others threads for the other asynch invocation
            Thread.Sleep( TimeSpan.FromSeconds(4) );
            WL( string.Format("Thread {0} finished", Thread.CurrentThread.GetHashCode()) ); 
            return string.Format( "Thread {0} did extract {1} from {2}", Thread.CurrentThread.GetHashCode(), name.Split(':')[0].Trim(), name ); // Simulate modification of value
        }
        finally {
            if( USE_SEMAPHORE )
                ConcurrentProcessingSemaphore.Release();
        }
    }
    
    #region Helper methods
    ...
    #endregion
}

Aucun commentaire: