Thread Pool et futures – exemple de programmation par promesses

Ce document montre deux versions d'un programme utilisant des futures pour réaliser du code en parallèle :

Ce qui suit détaille un exemple complet et opérationnel de programme implémentant un calcul séquentiel (la comptabilité du nombre de mots dans un fichier, appliqué à plusieurs fichiers; probablement le problème canonique dans le genre) et son implémentation parallèle, entre autres dans une optique de cibler intelligemment le moment le plus opportun pour passer d'une stratégie à l'autre. Nous explorerons au passage le schéma de conception Regroupement de threads (Thread Pool) et la métaphore de la programmation par promesses, à l'aide de futures.

Le problème que nous atttaquerons sera un classique dans le genre, c'est-à-dire compter les mots dans un fichier. C'est un problème qui, pour un ensemble de fichiers donné, se résout bien séquentiellement (on ouvre un fichier, on compte les mots qui y apparaissent, on le ferme, on passe au suivant), mais qui est tel qu'il est fortement parallélisable (le traitement de chaque fichier est indépendant de celui des autres; seul les moments où sont colligés les résultats est critique).

L'exemple proposé est complet et comprend plusieurs modules. Dans certains cas, les concepts proposés sont détaillés ailleurs sur le site de votre chic prof, alors soyez tolérant(e)s si vous rencontrez des renvois ici et là. J'ai essayé de présenter les modules en ordre croissant de dépendances, pour que les trucs les plus indépendants (les moins fortement couplés) apparaissent d'abord. J'espère que cela vous conviendra. Entre autres, nous aurons recours à l'idiome d'objets incopiables, à des mutex portables, des sections critiques portables et à des événements portables (que je laisse en exercice). Nous utiliserons aussi des assertions statiques, une technique de métaprogrammation bien connue.

Le code en illustration sera découpé en tronçons :

Vous pourrez ainsi naviguer rapidement les sections qui sont moins essentielles au propos, à moins que vous n'ayez un intérêt pour les manoeuvres qui s'y trouvent (et qui sont, en soi, parfois assez amusantes et assez instructives). En escamotant des sections, évidemment, soyez conscient(e)s qu'il est possible que vous deviez faire un acte de foi ici et là en lien avec la technique escamotée au passage.

Vous remarquerez ici et là des commentaires indiquant des raffinement à apporter. Si vous avez envie de vous amuser...

Survol conceptuel

Nous mettrons en place un système capable de comptabiliser le nombre de mots total apparaissant dans un ensemble de fichiers. Il est possible de comptabiliser le même fichier plusieurs fois si le code client le souhaite (filtrer la liste des noms de fichiers pour en éliminer la redondance n'est pas le propos de cet exercice), et on considère un mot comme étant une séquence de symboles séparés par des blancs, ce qui permettra de consommer des mots aussi simplement qu'on consommerait des std::string sur un std::istream à l'aide de l'opérateur >>.

Pour ce faire, nous utiliserons entre autres des futures. Une future est un objet représentant la promesse d'un résultat, qui peut être testé de manière non-bloquante pour savoir si le traitement qui lui est associé a été complété, et qui peut être invoqué de manière bloquante pour en extirper le résultat.

Un exemple simplifié d'utilisation d'une future serait celui exposé à droite. Vous remarquerez sans peine un découplage fort entre les détails techniques de la plateforme sous-jacente, et une grande simplicité d'utilisation. Il faut cependant travailler assez fort pour en arriver à cela, comme vous le constaterez.

La construction d'une future fait en sorte de traiter concurremment la tâche qui lui est associée (ce qui peut être fait en lançant un thread, en demandant à un tiers sur un autre ordinateur de faire le travail, en ajoutant une tâche dans une liste à traiter par un regroupement de threads comme dans cet exemple, etc.). Invoquer l'opérateur () (traiter la future comme une fonction) bloque, s'il y a lieu, jusqu'à obtention du résultat, puis retourne ce résultat. Tester la complétion de la future (ici, la traiter comme un booléen) est une tâche non-bloquante, ce qui permet au code client de vaquer à d'autres occupations.

// ...
struct calcul_long
   : nullary_operator<double>
{
   result_type operator()(); // peu importe
};
// ...
future<calcul_long> f = calcul_long();
while (!f)
{
   // faire des trucs
}
// retourne le résultat du calcul
return f();
// ...

Une future plus complète que celle qui est proposée ici permettrait aussi d'annuler une tâche et de copier la future à loisir. Je vous invite à raffiner le tout si vous le souhaitez.

Nous utiliserons aussi un regroupement de threads (en anglais : Thread Pool). Plutôt que de lancer des threads à la pièce, nous en construirons un certain nombre au préalable, et nous leur donnerons tous une seule et même responsabilité : piger une tâche parmi celles qui sont en attente de traitement, la prendre en charge, puis recommencer. Ces threads seront des ouvriers, ou Worker Threads.

Ceci permettra l'implémentation de futures de manière quelque peu simplifiée, et limitera le coût du démarrage des threads à celui encouru au moment initial (mis à part les moments où le système sera tellement sollicité par des tâches en cours qu'il devra accroître le nombre d'ouvriers à l'oeuvre. Les regroupements de threads foisonnent sur les plateformes contemporaines; vous pouvez remplacer celui proposé ici par celui de votre plateforme de prédilection dans la mesure où vous aurez pris soin d'adapter le code quelque peu.

Reste à voir comment, concrètement, nous compterons les mots de manière séquentielle (le code montrant le traitement en version parallèle a été placé dans la section sur les futures, un peu plus bas).

Une tautologie est une expression logique qui est vraie peu importe les conditions initiales. Pour des raisons techniques sur lesquelles nous reviendrons plus bas, nous utiliserons le foncteur générique whatever qui évalue toujours à vrai.

Depuis C++ 14, il est possible d'écrire plus simplement ceci :

auto whatever = [](auto &&) { return true; };
#ifndef WHATEVER_H
#define WHATEVER_H
struct whatever
{
   template <class T>
      bool operator()(T &&)
         { return true; }
};
#endif

Compter les mots en série est simple : prendre chacun des fichiers à traiter, y compter les mots et retourner le cumul obtenu. Cette approche fonctionne très bien en pratique, et peut être réinvestie par la version parallèle quand le seuil choisi pour passer de parallèle à séquentiel a été atteint.

J'ai séparé le traitement en deux : compter les mots dans un seul document, puis compter les mots dans plusieurs documents (en appelant plusieurs fois la fonction comptant les mots dans un seul document).

#ifndef COMPTER_MOTS_SERIEL
#define COMPTER_MOTS_SERIEL
#include "whatever.h"
#include <algorithm>
#include <numeric>
#include <string>
#include <iterator>
#include <fstream>
int compter_mots_document(const std::string &nom) {
   using namespace std;
   return count_if(
      istream_iterator<string>{ ifstream{ nom } },
      istream_iterator<string>{}, whatever{}
   );
}
template <class It>
int compter_mots_multidocuments_seriel(It debut, It fin)
{
   using namespace std;
   return accumulate(debut, fin, 0, [](int so_far, const string &nom) {
      return so_far + compter_mots_document(nom);
   });
}
#endif

Le programme principal et son rôle

Le programme principal réalisera un ensemble de tests et exposera, dans un fichier de sortie, le fruit des calculs réalisés. Pour l'invoquer, il suffira de lancer le programme avec une liste de noms de fichiers, et de le laisser travailler. Les fruits de ce programme permettront entre autres de trouver un bon seuil de transition séquentiel/ parallèle pour un système donné.

Pour l'essentiel, donc, ce code lance des tâches de traitement séquentiel ou parallèle, tire des métriques, et entrepose le tout dans un fichier de sortie pour consultation ultérieure, à tête reposée. Ce programme fait la démonstration qu'il est possible d'utiliser le regroupement de threads et les futures, mais sert surtout à déterminer les conditions idéales utilisation de ces outils dans un contexte appliqué.

#include "statistiques.h"
#include "compter_mots.h"
#include "compter_mots_seriel.h"
#include "compter_mots_map_reduce.h"
#include "minuterie.h"
#include <sstream>
#include <iostream>
#include <iterator>
#include <algorithm>
#include <string>
#include <sstream>
using namespace std;
const char *terminaison(bool pluriel)
{
   static const char * TERMINAISONS [] = { "", "s" };
   return TERMINAISONS[pluriel? 1 : 0];
}
const char *config()
{
#ifdef _DEBUG
   return "(DEBUG)";
#else
   return "(RELEASE)";
#endif
}
template <class It>
   double tester_seriel(It debut, It fin, ostream &sortie, int n_essai)
   {
      stringstream sstr;
      sortie << "Approche sérielle, essai " << n_essai << endl;
      int n_seriel;
      double ecoule = 0.0;
      {
         minuterie minu{sstr, &ecoule};
         n_seriel = compter_mots_multidocuments_seriel(debut, fin);
      }
      sortie << '\t' << n_seriel << " mots au total" << endl;
      sortie << '\t' << sstr.str() << endl;
      return ecoule;
   }
template <class It>
   double tester_parallele(It debut, It fin, ostream &sortie, int n_essai, int seuil = CompterMots<It>::SEUIL_DEFAUT)
   {
      stringstream sstr;
      sortie << "Approche parallele, essai " << n_essai <<  ", seuil sequentiel " << seuil << endl;
      int n_map_reduce;
      double ecoule = 0.0;
      {
         minuterie minu{sstr, &ecoule};
         n_map_reduce = compter_mots_multidocuments_map_reduce(debut, fin, seuil);
      }
      sortie << '\t' << n_map_reduce << " mots au total" << endl;
      sortie << '\t' << sstr.str() << endl;
      return ecoule;
   }
template <class It>
   void production_statistiques(const std::string &titre, It debut, It fin, std::ostream &os)
   {
      os << "Version " << titre << ":\n"
         << "*\tmeilleur resultat: " << *min_element(debut, fin) << " ms\n"
         << "*\tpire resultat: " << *max_element(debut, fin) << " ms\n"
         << "*\ttemps moyen: " << moyenne(debut, fin) << " ms\n"
         << "*\tecart type: " << ecart_type(debut, fin) << " ms\n"
         << endl;
   }
int main(int argc, char *argv[])
{
   ofstream sortie{"out.txt"};
   const int NDOCS = argc - 1;
   enum { NTESTS = 10 };
   sortie << "Compter les mots, " << NDOCS
          << " document" << terminaison(argc > 2)
          << ", " << NTESTS << " test" << terminaison(NTESTS > 2)
          << ' ' << config() << "\n\t";
   copy (argv + 1, argv + argc, ostream_iterator<char*>{sortie, "\n\t"});
   sortie << endl;
   enum { NSEUILS = 20 };
   double resultats_sequentiel[NTESTS] = { 0.0 };
   double resultats_parallele[NTESTS] = { 0.0 };
   double resultats_parallele_seuil[NSEUILS][NTESTS] = { {0.0} };
   for (int i = 0; i < NTESTS; ++i)
   {
      resultats_sequentiel[i] = tester_seriel(argv + 1, argv + argc, sortie, i + 1);
      resultats_parallele[i] = tester_parallele(argv + 1, argv + argc, sortie, i + 1);
      for (int j = 0; j < NSEUILS; ++j)
         resultats_parallele_seuil[j][i] = tester_parallele(argv + 1, argv + argc, sortie, i + 1, j + 1);
      sortie << endl;
      cout << '.' << flush;
   }
   cout << endl;
   //
   // Production de statistiques simples
   //
   production_statistiques("sequentielle", resultats_sequentiel, resultats_sequentiel + NTESTS, sortie);
   production_statistiques("parallele", resultats_parallele, resultats_parallele + NTESTS, sortie);
   for (int i = 0; i < NSEUILS; ++i)
      production_statistiques("parallele avec seuil sequentiel " + to_string(i+1),
                              resultats_parallele_seuil[i],
                              resultats_parallele_seuil[i] + NTESTS,
                              sortie);
   double moyennes[NSEUILS];
   for (int i = 0; i < NSEUILS; ++i)
      moyennes[i] = moyenne(begin(resultats_parallele_seuil[i]), end(resultats_parallele_seuil[i]));
   sortie << "\n\nMeilleur temps moyen: ";
   double *p = min_element(begin(moyennes), end(moyennes));
   sortie << *p << " ms. avec un seuil sequentiel de " << static_cast<int>(p - moyennes + 1) << endl;
   thread_pool_::get().log_stats(sortie);
}

Pour comprendre les futures

Tel que mentionné plus haut, une future encapsule la promesse du résultat d'un calcul. Les futures que nous utiliserons ici seront couplées avec l'implémentation du regroupement de threads dont nous discuterons plus bas, mais c'est plus un choix d'implémentation de la part de votre humble serviteur qu'une condition technique ou conceptuelle à proprement dit.

Les extensions par traits à la bibliothèque <functional> ont été couverts plus tôt, alors je ne répéterai pas les explications ici. Disons seulement que, pour que les futures puissent capturer la valeur retournée par l'opération qu'elles prennent en charge, il leur faudra connaître le type dans lequel cette valeur sera exprimée. Les traits d'opérations nullaires serviront précisément à cela.

Notez qu'une partie de l'encapsulation permise la l'implémentation de futures proposée ici repose sur des délégués, qui constituent un outil plus lourd dans notre arsenal conceptuel. Si vous souhaitez en savoir plus sur le sujet, il vous faudra travailler un peu plus fort.

Les outils de cette section sont dépréciés, s'harmonisant mal avec la surcharge de fonctions, mais je n'ai pas encore eu le temps de réécrire le tout en fonction des pratiques contemporaines.

#ifndef FUNCTIONAL_EXT_H
#define FUNCTIONAL_EXT_H
#include <functional>
template <class F>
   struct unary_function_traits
   {
      using argument_type = typename F::argument_type;
      using result_type = typename F::result_type;
   };
template <class A, class R>
   struct unary_function_traits<R (*)(A)>
   {
      using argument_type = A;
      using result_type = R;
   };
template <class R>
   struct nullary_function
      : std::unary_function<void, R>
   {
   };
template <class F>
   struct nullary_function_traits
   {
      using result_type = typename F::result_type;
   };
template <class R>
   struct nullary_function_traits<R (*)()>
   {
      using result_type = R;
   };
#endif

Le Thread Pool lancera plusieurs threads ouvriers (ce qu'on nomme typiquement des Worker Threads), qui prendront en charge des tâches, représentées par la classe Tache visible à droite.

Une Tache possèdera un identifiant entier (pour faciliter le débogage). Elle pourra être prise en charge et exécutée (méthode abstraite faire()), testée de manière non bloquante pour vérifier si elle a été complétée, et permettra qu'on se mette en attente bloquante de la fin de son exécution.

D'autres services (attente de la complétion jusqu'à concurrence d'un certain délai maximal) pourraient être ajoutés à cette classe si vous le jugez nécessaire.

#ifndef TACHE_H
#define TACHE_H
#include "Evenement.h"
struct Tache
{
   virtual void faire() = 0;
   virtual ~Tache() = default;
   Tache()
      : complete_{}, no_(++g_no_), evenement_{Evenement::ManualReset{}}
   {
   }
   bool complete() const
      { return complete_; }
   int no() const
      { return no_; }
   void attendre() const
      { evenement_.attendre(); }
private:
   Evenement evenement_;
   static int g_no_;
   int no_;
   bool complete_;
   void fini()
      { complete_ = true; evenement_.provoquer(); }
   //
   // ICI: cacher ce détail technique quelque part
   //
   friend unsigned long __stdcall ze_thread(void *);
};
#endif

Pour des raisons techniques, nous définissons la variable représentant l'identifiant de Tache le plus récemment attribué dans un fichier source. En C++, il est possible de définir les constantes de classe entières dans un fichier d'en-tête, mais les constantes de classe qui ne sont pas entière et les attributs de classe non-constants doivent être définis dans une unité de traduction qui assurera leur unicité (donc, typiquement, dans un fichier source, puisque personne ne les inclura).

#include "Tache.h"
int Tache::g_no_ = 0;

Les threads ouvriers seront capables de prendre une Tache en charge, de l'exécuter et de s'en débarrasser.

Ce sont ces ouvriers qui assureront la dynamique du système. Là où chaque Tache représentera ce qui doit être fait, chaque Worker représentera l'action de faire cette chose.

La méthode clé ici est prendre_en_charge(), qui assure la prise en charge d'une tâche par un ouvrier. Elle est décrite plus bas.

Notez que, bien qu'un Worker ait pour rôle de manipuler des instances de Tache, l'interface entière de la classe Worker n'a aucunement besoin de connaître le détail de la classe Tache ou la taille des instances de cette classe (dans l'interface de Worker, le seul endroit où le nom Tache est utilisé, il l'est à travers un pointeur). Pour cette raison, seul le nom de la classe Tache est introduit ici (instruction struct Tache;), limitant ainsi le couplage entre les fichiers.

#ifndef WORKER_H
#define WORKER_H
#include "Incopiable.h"
#include "Evenement.h"
struct Tache;
//
// Un Worker traite une Tache* sans se préoccuper de considérations
// de synchronisation ou de détails propres au domain d'application
//
class Worker
   : Incopiable
{
   //
   // ICI: cacher cette signature dans un .cpp pour réduire
   // le couplage, quand j'aurai une minute...
   //
   friend unsigned long __stdcall ze_thread(void *);
   bool actif_;
   bool demande_arret_;
   Evenement evenement_;
   void fin()
   {
      actif_ = {};
      evenement_.provoquer();
   }
public:
   bool demande_arret() const
      { return demande_arret_; }
   Worker()
      : actif_{true}, demande_arret_{}, evenement_{Evenement::ManualReset{}}
   {
   }
   void prendre_en_charge(Tache *);
   bool actif() const
      { return actif_; }
   void stop()
   {
      demande_arret_ = true;
      evenement_.attendre();
   }
};
#endif

Le code de prise en charge d'une Tache par un ouvrier est dissimulé dans un fichier source pour fins de réduction de couplage. Sans grande surprise, quand un ouvrier prend en charge une Tache, il lui dit de se faire, et c'est tout.

#include "Worker.h"
#include "Tache.h"
void Worker::prendre_en_charge(Tache *t)
   { t->faire(); }

Pour faciliter la jonction entre les instances de future et les instances de Tache, une classe générique TacheHolder sera utilisée. Tache est une racine polymorphique, ce qui permet de placer des pointeurs sur toutes les tâches dans un seul et même conteneur et d'exprimer le regroupement de threads sur la base de tâches au sens large, mais lorsqu'il vient le temps d'implémenter des tâches spécifiques, nous instancierons un TacheHolder<F> pour chaque tâche de type F. L'abstraction des détails techniques de F passera par un délégué nullaire, ce qui est d'ailleurs un excellent rôle pour des délégués.

Cette classe dépose l'opération à faire dans un délégué, et implémente le service Tache::faire() à l'aide d'une invocation du délégué en question. Ce faisant, l'opération encapsulée par une Tache donnée pourra être une fonction nullaire ou un foncteur nullaire.

Le TacheHolder déduira le type de donnée retourné par la tâche qu'encapsulera le délégué, et saura retenir une copie de cette donnée dans un attribut d'instance, permettant ainsi le découplage temporel entre le travail fait par la future et les besoins du code client.

On trouve des idiomes analogues dans d'autres types de données exigeant à la fois une interface non-générique (pensez à Tache) mais une implémentation générique (pensez à TacheHolder). Pour un exemple, voir la technique de l'effacement de types.

#ifndef TACHE_HOLDER_H
#define TACHE_HOLDER_H
#include "Tache.h"
#include "functional_ext.h"
#include "delegue.h"
template <class F>
   class TacheHolder
      : public Tache
   {
   public:
      using result_type = typename
         nullary_function_traits<F>::result_type;
   private:
      delegue<result_type> fct_;
      result_type res_;
   public:
      TacheHolder(F fct)
         : fct_{fct}
      {
      }
      void faire()
         { res_ = fct_(); }
      result_type result() const
         { return res_; }
   };
#endif

Une future encapsulera un TacheHolder* contenant la tâche à réaliser en parallèle.

Créer une future, dans cette implémentation, injectera la tâche à faire dans la liste contenue dans celles connues du regroupement de threads. à partir de ce moment, on ne sait pas quand la tâche en question sera complétée. Une autre approche pourrait être de lancer un nouveau thread à la construction de la future, mais ça pourrait être coûteux. Cependant, si vous utilisez un cadriciel (un Framework) offrant déjà un regroupement de threads, le code de future exposé à droite pourrait être adapté pour profiter de ses services.

Le reste de l'implémentation de la future sert à rendre conviviale l'interaction entre le code client et le code de traitement concurrent de la tâche. Un raffinement possible serait de partager le TacheHolder* à l'aide d'un pointeur intelligent implémentant une sémantique de partage (par exemple un std::shared_ptr), ce qui permettrait entre autres de copier une future à loisir et d'implémenter une fabrique de future pour alléger la syntaxe du code client. Le TacheHolder* fait une partie importante du travail d'encapsulation du travail à faire, mais n'est pas très convivial; la future, elle, est une métaphore facile à utiliser par le code client.

Le reste est de la technique : on testera de manière non-bloquante une future pour connaître sa complétion à travers son opérateur de conversion en booléen, et on invoquera de manière bloquante le résultat du calcul qu'elle représente à l'aide de son opérateur ().

#ifndef FUTURE_H
#define FUTURE_H
#include "Incopiable.h"
#include "functional_ext.h"
#include "TacheHolder.h"
#include "thread_pool.h"
//
// ICI: à faire: partager fct_ 
//
template <class F>
   class future
      : Incopiable
   {
   public:
      using result_type = typename
         nullary_function_traits<F>::result_type;
   private:
      TacheHolder<F> *fct_;
   public:
      future(F fct)
         : fct_{new TacheHolder<F>(fct)}
      {
         thread_pool_::get().enfiler(fct_);
      }
      ~future() noexcept
         { delete fct_; }
      result_type operator()() const
         { fct_->attendre(); return fct_->result(); }
      operator bool() const
         { return fct_->complete(); }
   };
/*
template <class F>
   future<F> creer_future(F fct)
      { return future<F>(fct); }
*/
#endif

Le code parallèle pour compter des mots dans des fichiers repose sur le code séquentiel accomplissant la même tâche.

La ligne clé est :

cumul_ = fA() + fB();

qui, sans trop que cela ne paraisse, cumule le fruit de deux calculs réalisés en parallèle.

Le seuil par défaut utilisé ici pour passer de séquentiel à parallèle est complètement arbitraire. Un bon seuil serait déterminé de manière empirique.

La fonction creer_compter_mots() est une fonction de fabrication qui allège la construction des objets qui seront encapsulés par les TacheHolder* cachés derrière les futures en déduisant les types de ses paramètres.

Le seuil à partir duquel l'ensemble des fichiers à traiter est suffisamment petit pour procéder de manière séquentielle peut être choisi à la construction d'une instance de CompterMots, ce qui facilite la rédaction des tests de performance (voir plus bas).

#ifndef COMPTER_MOTS_MAP_REDUCE_H
#define COMPTER_MOTS_MAP_REDUCE_H
#include "future.h"
#include "functional_ext.h"
#include "compter_mots_seriel.h"
#include <iterator>
#include <algorithm>
template <class>
   class CompterMots;
template <class It>
   CompterMots<It> creer_compter_mots(It debut, It fin, int &cumul, int seuil = 8)
   {
      return { debut, fin, cumul, seuil };
   }
template <class It>
   class CompterMots
      : public nullary_function<int>
   {
      It debut_, fin_;
      result_type &cumul_;
      int seuil_;
   public:
      enum { SEUIL_DEFAUT = 8 };
      CompterMots(It debut, It fin, int &cumul, int seuil = SEUIL_DEFAUT)
         : debut_{debut}, fin_{fin}, cumul_(cumul), seuil_{seuil}
      {
      }
      int seuil_sequentiel() const
         { return seuil_; }
      result_type operator()()
      {
         using namespace std;
         auto n = distance(debut_, fin_);
         if (n <= seuil_sequentiel())
            cumul_ = compter_mots_multidocuments_seriel(debut_, fin_);
         else
         {
            using future_cpt_mots = future<CompterMots<It>>;
            It milieu = debut_;
            advance(milieu, n/2);
            //
            // Nous pourrons alléger ce qui suit avec les rvalue references de C++11
            //
            int sommeA = 0, sommeB = 0;
            future_cpt_mots fA(
               creer_compter_mots(debut_, milieu, sommeA, seuil_sequentiel())
            );
            future_cpt_mots fB(
               creer_compter_mots(milieu, fin_, sommeB, seuil_sequentiel())
            );
            cumul_ = fA() + fB();
         }
         return cumul_;
      }
   };
template <class It>
   int compter_mots_multidocuments_map_reduce(It debut, It fin, int seuil = CompterMots<It>::SEUIL_DEFAUT)
   {
      //
      // Nous pourrons alléger ce qui suit avec les rvalue references de C++11
      //
      int somme = 0;
      return creer_compter_mots(debut, fin, somme)();
   }
#endif

Pour comprendre le regroupement de threads

Le regroupement de threads sera un singleton, mais c'est plus une question de simplicité que de nécessité car il serait possible d'envisager un système à plusieurs regroupements de threads distincts. Une partie importante du code que vous trouverez dans cette section tire des statistiques d'utilisation; le code du regroupement en soi est plus léger qu'il n'y paraît.

Ici, N est la taille initiale du regroupement. J'utilisais à l'origine des tableaux de taille fixe pour h_ et pour worker_, mais en pratique des approches comme Map/ Reduce consomment beaucoup de threads qui sont mis en attente du fruit du calcul d'autres threads, et on se retrouve avec une croissance rapide de threads dormants. Conséquemment, j'ai choisi d'y aller avec un compromis, soit le vecteur qui offre des temps d'accès et de parcours comparables (pratiquement identiques) à ceux d'un tableau brut mais qui permettent d'adapter leur capacité en fonction des moments.

Ici, notez que vous pourriez avoir plusieurs instances de thread_pool dans la mesure où la valeur attribuée à N serait chaque fois différente. Je ne sais pas à quel point ce serait utile, mais l'implémentation actuelle le permet.

Un thread_pool comprendra, en gros, une file de tâches à faire (attribut taches_, synchronisé par l'attribut mutex_) et une liste d'ouvriers (et de leurs threads), représentée par deux conteneurs (h_ pour les HANDLE et worker_ pour les ouvriers eux-mêmes).

Le regroupement est synchronisé pour deux raisons distinctes, ce qui explique la présence de deux mutex :

  • l'évaluation du nombre de tâches en cours de service est synchronisée. Ceci permet de constater, parfois, que le système est surchargé (par exemple dans le cas où plusieurs tâches sont de nature bloquante et accaparent les ouvriers en les suspendant); et
  • l'accès à la liste des tâches à proprement dit, pour y insérer un Tache* ou pour en retirer un.

N'oubliez pas que la classe Mutex représente le concept de synchronisation par accès exclusif à une ressource. Ici, il est probable qu'une implémentation sous-jacente locale au processus (une section critique plutôt qu'un outil du système d'exploitation) soit avantageuse.

Dans le cas où le système est plutôt plein (voir la méthode du même nom), le regroupement pourra redimensionner la liste des ouvriers pour éviter un phénomène d'attrition des forces de travail. Remarquez que la méthode qui procède à cette redimension est privée, ce qui explique qu'elle puisse supposer que son client, étant interne à la classe, se soit comporté correctement et ait au préalable synchronisé les accès sur les structures de données appropriées. Remarquez aussi que la politique de redimensionnement appliquée ici est d'ajouter un nombre fixe d'ouvriers; ce n'est pas la seule approche possible.

La décision d'ajouter ou non des ouvriers se fait suite à la saisie d'une nouvelle tâche dans le lot des tâches disponibles. Si le système est saturé, alors le regroupement de threads recrutera de nouveaux employés, en quelque sorte.

Le recours à un regroupement de 32 threads par défaut est absolument arbitraire. Pour valider la valeur intiale idéale, il faudrait procéder à des tests plus poussés, ce que je n'ai pas fait.

#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include "Incopiable.h"
#include "Worker.h"
#include "Mutex.h"
#include "Detruire.h"
#include "Fabriquer.h"
#include <algorithm>
#include <queue>
#include <vector>
#include <iterator>
#include <string>
#include <ctime>
#include <ostream>
#include <cassert>
#include <windows.h> // isoler quand j'aurai deux minutes
#include <iostream>
using namespace std; // pas gentil...
template <int N>
   class thread_pool
      : Incopiable
   {
      struct Stats
      {
         struct occupation
         {
            string quoi_;
            int val_;
            clock_t temps_;
            occupation(const string &quoi, int val, clock_t temps)
               : quoi_{quoi}, val_{val}, temps_{temps}
            {
            }
            friend ostream& operator<<(ostream &os, const occupation &occ)
            {
               return os << occ.quoi_ << ": " << occ.val_ << " en "
                         << static_cast<double>(occ.temps_)/CLOCKS_PER_SEC * 1000.0
                         << " ms.";
            }
         };
         vector<occupation> data_;
         friend ostream& operator<<(ostream &os, const Stats &stats)
         {
            copy(stats.data_.begin(), stats.data_.end(), ostream_iterator<occupation>(os, "\n"));
            return os;
         }
      };
      Stats stats_;
      vector<HANDLE> h_;
      vector<Worker*> worker_;
      clock_t temps_;
      thread_pool()
         : h_{N}, worker_{N}, nb_en_cours_{}
      {
         temps_ = clock();
         fill(begin(h_), end(h_), INVALID_HANDLE_VALUE);
         try
         {
            generate(begin(worker_), end(worker_), Fabriquer<Worker>());
            struct creer_thread
            {
               vector<Worker*>::iterator base_;
               creer_thread(vector<Worker*>::iterator base)
                  : base_{base}
               {
               }
               HANDLE operator()()
               {
                  auto h = CreateThread(0, 0, ze_thread, *base_, CREATE_SUSPENDED, 0);
                  assert(h != INVALID_HANDLE_VALUE);
                  ++base_;
                  return h;
               }
            };
            generate(begin(h_), end(h_), creer_thread(worker_.begin()));
            for(auto hdl : h_) ResumeThread(hdl);
         }
         catch (...)
         {
            for(auto p : worker_) delete p;
            throw;
         }
      }
      Mutex mutex_;
      Mutex mutex_cpt_taches_;
      queue<Tache *> taches_;
      int nb_en_cours_;
      bool plutot_plein() const
      {
         Autoverrou av{mutex_cpt_taches_};
         return nb_en_cours_ + 1 == worker_.size();
      }
   public:
      void tache_en_cours()
      {
         Autoverrou av{mutex_cpt_taches_};
         ++nb_en_cours_;
      }
      void tache_completee()
      {
         Autoverrou av{mutex_cpt_taches_};
         --nb_en_cours_;
      }
      bool empty() const
      {
         Autoverrou av{mutex_};
         return av && taches_.empty();
      }
      bool enfiler(Tache *p)
      {
         if (!p) return false;
         Autoverrou av{mutex_};
         if (av)
            taches_.push(p);
         return true;
      }
   private:
      void calculer_temps_travailleurs()
      {
         auto maintenant = clock();
         auto delta_t = maintenant - temps_;
         stats_.data_.push_back(Stats::occupation("Temps avec un nombre de threads", h_.size(), delta_t));
         stats_.data_.push_back(Stats::occupation("Temps avec un nombre de travailleurs", worker_.size(), delta_t));
         temps_ = maintenant;
      }
      void add_workers()
      {
         calculer_temps_travailleurs();
         auto cur_n = worker_.size();
         auto n = cur_n+N;
         try
         {
            for (auto i = cur_n; i != n; ++i)
               worker_.push_back(Fabriquer<Worker>()());
            struct creer_thread
            {
               vector<Worker*>::iterator base_;
               creer_thread(vector<Worker*>::iterator base)
                  : base_{base}
               {
               }
               HANDLE operator()()
               {
                  auto h = CreateThread(0, 0, ze_thread, *base_, CREATE_SUSPENDED, 0);
                  assert(h != INVALID_HANDLE_VALUE);
                  ++base_;
                  return h;
               }
            };
            h_.resize(n);
            vector<HANDLE>::iterator base = h_.begin()+cur_n;
            generate(base, h_.end(), creer_thread(worker_.begin()+cur_n));
            for_each(base, h_.end(), ResumeThread);
         }
         catch (...)
         {
            while (worker_.size() != cur_n)
            {
               delete worker_.back();
               worker_.pop_back();
            }
         }
      }
   public:
      Tache *prochaine()
      {
         Autoverrou av{mutex_};
         if (av && !taches_.empty())
         {
            auto p = taches_.front();
            taches_.pop();
            if (plutot_plein())
               add_workers();
            return p;
         }
         return nullptr;
      }
      ~thread_pool() noexcept
      {
         while (!empty()) // hum...
            ;
         struct stop_work
         {
            void operator()(Worker *w)
            {
               w->stop();
               delete w;
            }
         };
         for_each(worker_.begin(), worker_.end(), stop_work());
         for(auto hdl : h_) CloseHandle(hdl);
      }
      static thread_pool & get() noexcept
      {
         static thread_pool singleton;
         return singleton;
      }
      void log_stats(std::ostream &os)
      {
         calculer_temps_travailleurs();
         os << "Nombre maximum de threads utilises: " << h_.size() << endl;
         os << "Nombre maximum de travailleurs utilises: " << worker_.size() << endl;
         os << stats_ << endl;
      }
   };
using thread_pool_ = thread_pool<32>;
#endif

Tous les ouvriers sont exécutés par une instance distincte de la même fonction, nommée ici ze_thread().

Cette fonction assurera le bon comportement de l'ouvrier et son interaction correcte avec le regroupement de threads implémenté.

Vous remarquerez que ze_thread() se comporte correctement eu égard au regroupement de threads et à la gestion des tâches. Elle fait en quelque sorte partie de l'implémentation du code des ouvriers, et il faut la voir comme telle.

Question comme ça : ce code se comportera-t-il correctement si l'invocation de la méthode faire() sur une tâche donnée, à travers la méthode prendre_en_charge() d'un ouvrier, lève une exception? Si oui, pourquoi? Sinon, doit-on apporter des correctifs (et, si oui, lesquels)?

#include "thread_pool.h"
#include "Worker.h"
#include "Tache.h"
#include <iostream>
using namespace std;
unsigned long __stdcall ze_thread(void *p)
{
   Worker &w = *static_cast<Worker*>(p);
   thread_pool_ &pool = thread_pool_::get();
   while (!w.demande_arret())
   {
      auto tache = pool.prochaine();
      if (tache)
      {
         pool.tache_en_cours();
         w.prendre_en_charge(tache);
         tache->fini();
         pool.tache_completee();
      }
   }
   w.fin();
   return 0;
}

Quelques utilitaires légers

Nous utilisons plusieurs petits utilitaires (souvent des foncteurs) dans le code présenté ici.

Nous aurons aussi recours à des foncteurs de fabrication, par exemple le foncteur Fabriquer, à droite. Il est parfois utile de pouvoir exprimer la fabrication dynamique d'objets sous forme d'opération plutôt que sous forme d'opérateurs...

#ifndef FABRIQUER_H
#define FABRIQUER_H
template <class T>
   struct Fabriquer
   {
      T* operator()() const
         { return new T; }
      template <class P0>
         T* operator()(P0 p0) const
         {
            return new T(p0);
         }
   };
#endif

Notre programme principal fera des comparatifs de vitesse entre une implémentaqtion séquentielle et diverses implémentations parallèles du même traitement.

Pour les besoins de la cause, une saisie du temps écoulé d'une granularité d'une milliseconde suffira. Notez que je triche un peu ici en présumant que CLOCKS_PER_SEC ait la valeur 1000, ce qui n'est pas garanti par la norme du langage; c'est pas gentil!

Nous utiliserons donc la minuterie simpliste proposée à droite.

#ifndef MINUTERIE_H
#define MINUTERIE_H
#include <ctime>
#include <ostream>
using namespace std;
class minuterie
{
   ostream &os_;
   clock_t avant_;
   double *dest_;
public:
   minuterie(ostream &os, double *dest = 0)
      : os_{os}, dest_{dest}
   {
      avant_ = clock();
   }
   ~minuterie() noexcept
   {
      auto apres = clock();
      const double ecoule = static_cast<double>(apres - avant_)/CLOCKS_PER_SEC * 1000.0;
      if (dest_) *dest_ = ecoule;
      os_ << "Temps écoulé: " << ecoule << " ms.";
   }
};
#endif

Nous aurons besoin de calculer des moyennes et des écarts types un peu plus loin. Les algorithmes génériques visibles à droite font ce travail, et le font sur n'importe quelle séquence standard conforme (pour l'écart type, il est important que la séquence contienne au moins deux éléments).

Pour plus d'explications sur ce code, passez voir votre enseignant favori.

#ifndef STATISTIQUES_H
#define STATISTIQUES_H
#include <iterator>
#include <numeric>
#include <cmath>
template <class>
   struct traits_nombre;
template <>
   struct traits_nombre<short>
   {
      using cumul_type = int;
      using reel_type = float;
   };
template <>
   struct traits_nombre<int>
   {
      using cumul_type = long;
      using reel_type = float;
   };
template <>
   struct traits_nombre<float>
   {
      using cumul_type = double;
      using reel_type = float;
   };
template <>
   struct traits_nombre<double>
   {
      using cumul_type = long double;
      using reel_type = double;
   };
// etc.
template <class It, class C>
   typename traits_nombre<
      typename iterator_traits<It>::value_type
   >::reel_type
      moyenne (It debut, It fin, C cumul)
   {
      using value_type = typename
         iterator_traits<It>::value_type;
      using reel_type = typename
         traits_nombre<value_type>::reel_type;
      auto n = std::distance(debut, fin); // ceci peut être optimisé
      return static_cast<reel_type>(accumulate(debut, fin, cumul)) / n;
   }
template <class It>
   typename traits_nombre<
      typename iterator_traits<It>::value_type
   >::reel_type
      moyenne (It debut, It fin)
   {
      using value_type = typename
         iterator_traits<It>::value_type;
      using cumul_type = typename
         traits_nombre<value_type>::cumul_type;
      return moyenne(debut, fin, cumul_type{});
   }
template <class It>
   typename traits_nombre<
      typename iterator_traits<It>::value_type
   >::reel_type
      ecart_type(It debut, It fin)
   {
      using value_type = typename
         iterator_traits<It>::value_type;
      using reel_type = typename
         traits_nombre<value_type>::reel_type;
      const auto MOYENNE = moyenne(debut, fin);
      const auto n = distance(debut, fin); // ceci peut être optimisé
      reel_type cumul = {};
      for (; debut != fin; ++debut)
         cumul += pow(std::abs(static_cast<reel_type>(*debut) - MOYENNE), static_cast<reel_type>(2));
      return sqrt(cumul / (n - 1));
   }
#endif

Le code plus près des délégués

Convertir un booléen en entier (et inversement) dès la compilation est chose somme toute simple. Certaines validations dans le code implémentant les délégués reposent sur la capacité de réaliser agréablement ces conversions.

#ifndef STATIC_BOOL_INT_H
#define STATIC_BOOL_INT_H
template <bool>
   struct static_int_from_bool;
template <>
   struct static_int_from_bool<true>
   {
      static const int value = 1;
   };
template <>
   struct static_int_from_bool<false>
   {
      static const int value = 0;
   };
template <bool>
   struct static_inverse_bool;
template <>
   struct static_inverse_bool<true>
   {
      static const bool value = false;
   };
template <>
   struct static_inverse_bool<false>
   {
      static const bool value = true;
   };
#endif

Nous aurons parfois recours à un type qui représentera l'absence de types (le type Vide). Nous le décrirons de la manière visible à droite. Ici encore, être capable de représenter l'absence de type par un type nous permettra d'exprimer correctement des délégués sur un nombre variable (aux yeux du code client) de paramètres.

#ifndef VIDE_H
#define VIDE_H
#include <type_traits>
class Vide {};
template <class T>
   struct static_type_est_vide
      : std::is_same<T,Vide>
   {
   };
#endif

L'implémentation de délégués que nous utiliserons demande de choisir, à l'occasion, un type parmi plusieurs.

Le static_switch visible à droite joue ce rôle. Le schéma proposé ici pourrait être étendu facilement à un plus grand nombre de types, au besoin, dans la mesure où vous seriez disposé(e) à ajouter le code requis, mais c'est un peu fastidieux...

#ifndef STATIC_SWITCH_H
#define STATIC_SWITCH_H
//
// ajouter des tailles au besoin
//
template <int, class, class>
   struct static_switch2;
template <class T0, class T1>
   struct static_switch2<0, T0, T1>
   {
      using type = T0;
   };
template <class T0, class T1>
   struct static_switch2<1, T0, T1>
   {
      using type = T1;
   };
template <int, class, class, class>
   struct static_switch3;
template <class T0, class T1, class T2>
   struct static_switch3 <0, T0, T1, T2>
{
   using type = T0;
};
template <class T0, class T1, class T2>
   struct static_switch3 <1, T0, T1, T2>
{
   using type = T1;
};
template <class T0, class T1, class T2>
   struct static_switch3 <2, T0, T1, T2>
{
   using type = T2;
};
template <int, class, class, class, class>
   struct static_switch4;
template <class T0, class T1, class T2, class T3>
   struct static_switch4 <0, T0, T1, T2, T3>
{
   using type = T0;
};
template <class T0, class T1, class T2, class T3>
   struct static_switch4 <1, T0, T1, T2, T3>
{
   using type = T1;
};
template <class T0, class T1, class T2, class T3>
   struct static_switch4 <2, T0, T1, T2, T3>
{
   using type = T2;
};
template <class T0, class T1, class T2, class T3>
   struct static_switch4 <3, T0, T1, T2, T3>
{
   using type = T3;
};
#endif

Définir des délégués (concept fréquemment rencontré en C# et en Delphi) est possible en C++, Expliquer le code demanderait beaucoup de temps, mais ça ne veut pas dire que le code soit vraiment difficile à rédiger...

L'essentiel du travail ici a été fait par Patrick Boutot, diplômé du DDJV à l'Université de Sherbrooke. Je l'en remercie!

Quand j'aurai le temps, je retoucherai le tout pour profiter des mécanismes de C++ 11, en particulier des templates variadiques.

#ifndef DELEGUE_H
#define DELEGUE_H
#include "Vide.h"
#include "static_bool_int.h"
#include "static_switch.h"
class bad_function_call {};
//
// static_verification vérifie certaines occurrences de Vide dans
// une séquence de types
//
template <int, class, class, class>
   struct static_verification;
template <class Arg0, class Arg1, class Arg2>
   struct static_verification<0, Arg0, Arg1, Arg2>
   {
      static const bool is_arg0_vide = static_type_est_vide<Arg0>::value;
      static const bool is_arg1_vide = static_type_est_vide<Arg1>::value;
      static const bool is_arg2_vide = static_type_est_vide<Arg2>::value;
      static_assert(is_arg0_vide, "test0");
      static_assert(is_arg1_vide, "test1");
      static_assert(is_arg2_vide, "test2");
   };
template <class Arg0, class Arg1, class Arg2>
   struct static_verification<1, Arg0, Arg1, Arg2>
   {
      static const bool is_arg0_vide = static_type_est_vide<Arg0>::value;
      static const bool is_arg1_vide = static_type_est_vide<Arg1>::value;
      static const bool is_arg2_vide = static_type_est_vide<Arg2>::value;
      static_assert(!is_arg0_vide, "test0");
      static_assert(is_arg1_vide, "test1");
      static_assert(is_arg2_vide, "test2");
   };
template <class Arg0, class Arg1, class Arg2>
   struct static_verification<2, Arg0, Arg1, Arg2>
   {
      static const bool is_arg0_vide = static_type_est_vide<Arg0>::value;
      static const bool is_arg1_vide = static_type_est_vide<Arg1>::value;
      static const bool is_arg2_vide = static_type_est_vide<Arg2>::value;
      static_assert(!is_arg0_vide, "test0");
      static_assert(!is_arg1_vide, "test1");
      static_assert(is_arg2_vide, "test2");
   };
template <class Arg0, class Arg1, class Arg2>
   struct static_verification<3, Arg0, Arg1, Arg2>
   {
      static const bool is_arg0_vide = static_type_est_vide<Arg0>::value;
      static const bool is_arg1_vide = static_type_est_vide<Arg1>::value;
      static const bool is_arg2_vide = static_type_est_vide<Arg2>::value;

      static_assert(!is_arg0_vide, "test0");
      static_assert(!is_arg1_vide, "test1");
      static_assert(!is_arg2_vide, "test2");
   };
template <class T0, class T1, class T2>
   struct nb_params_valides
   {
   // juste pour que ce ne soit pas trop long
   private:
      static const int t0 =
         static_int_from_bool<
            static_inverse_bool<
               static_type_est_vide<T0>::value
            >::value
         >::value;
      static const int t1 =
         static_int_from_bool<
            static_inverse_bool<
               static_type_est_vide<T1>::value
            >::value
         >::value;
      static const int t2 =
         static_int_from_bool<
            static_inverse_bool<
               static_type_est_vide<T2>::value
            >::value
         >::value;
   public:
      static const int value = t0+ t1 + t2;
      static static_verification<value, T0, T1, T2> nb_valides;
   };
//
// Ci-dessous, je devrais sans doute ajouter des destructeurs virtuels
//
template <class R>
   struct delegue_base0
   {
      using result_type = R;
      virtual result_type invoke() const = 0;
      virtual ~delegue_base0() = default;
   };
template <class R, class Arg>
   struct delegue_base1
   {
      using result_type = R;
      using sargument_type = Arg;
      virtual result_type invoke(argument_type) const = 0;
      virtual ~delegue_base1() = default;
   };
template <class R, class Arg0, class Arg1>
   struct delegue_base2
   {
      using result_type = R;
      using first_argument_type = Arg0;
      using second_argument_type = Arg1;
      virtual result_type invoke(first_argument_type, second_argument_type) const = 0;
      virtual ~delegue_base2() = default;
   };
template <class R, class Arg0, class Arg1, class Arg2>
   struct delegue_base3
   {
      using result_type = R;
      using first_argument_type = Arg0;
      using second_argument_type = Arg1;
      using third_argument_type = Arg2;
      virtual result_type invoke(first_argument_type, second_argument_type, third_argument_type) const = 0;
      virtual ~delegue_base3() = default;
   };
template <class R, class T0, class T1, class T2>
   struct static_trouver_delegue_base
   {
   private:
      using t0 = delegue_base0<R>;
      using t1 = delegue_base1<R, T0>;
      using t2 = delegue_base2<R, T0, T1>;
      using t3 = delegue_base3<R, T0, T1, T2>;
      static const int value = nb_params_valides <T0, T1, T2>::value;
   public:
      using type = typename
         static_switch4<value, t0, t1, t2, t3>::type;
   };
//
// Détecteur de la sorte de fonction
//
template <class F, class R, class, class, class>
   struct general_fonction0
      : delegue_base0<R>
   {
      using fct_type = F;
      explicit general_fonction0(fct_type fct)
         : fct_{fct}
      {
      }
      virtual result_type invoke() const
         { return fct_(); }
   private:
      mutable fct_type fct_;
   };
template <class F, class R, class Arg0, class, class>
   struct general_fonction1
      : delegue_base1 <R, Arg0>
   {
      using fct_type = F;
      explicit general_fonction1(fct_type fct)
         : fct_{fct}
      {
      }
      virtual result_type invoke(argument_type arg) const
        { return fct_(arg); };
   private:
      mutable fct_type fct_;
   };
template <class F, class R, class Arg0, class Arg1, class>
   struct general_fonction2
      : delegue_base2 <R, Arg0, Arg1>
   {
      using fct_type = F;
      explicit general_fonction2(fct_type fct)
         : fct_{fct}
      {
      }
      virtual result_type invoke(first_argument_type arg0, second_argument_type arg1) const
         { return fct_(arg0, arg1); }
   private:
      mutable fct_type fct_;
   };
template <class F, class R, class Arg0, class Arg1, class Arg2>
   struct general_fonction3
      : delegue_base3 <R, Arg0, Arg1, Arg2>
   {
      using fct_type = F;
      explicit general_fonction3(fct_type fct)
         : fct_{fct}
      {
      }
      virtual result_type invoke(first_argument_type arg0, second_argument_type arg1, third_argument_type arg2) const
         { return fct_(arg0, arg1, arg2); }
   private:
      mutable fct_type fct_;
   };
template <class F, class R, class T0, class T1, class T2>
   struct static_trouver_general_fonction
   {
   private:
      using t0 = general_fonction0<F, R, T0, T1, T2>;
      using t1 = general_fonction1<F, R, T0, T1, T2>;
      using t2 = general_fonction2<F, R, T0, T1, T2>;
      using t3 = general_fonction3<F, R, T0, T1, T2>;
      static const int nb_valides = nb_params_valides <T0, T1, T2>::value;
   public:
      using type = typename static_switch4<nb_valides, t0, t1, t2, t3>::type;
   };
template <class T, class M, class R, class Arg0, class Arg1, class Arg2>
   struct general_methode0
      : delegue_base0 <R>
   {
      using this_type = T;
      using methode_type = M;
      explicit general_methode0(this_type* t, methode_type m)
         : this_{t}, methode_{m}
      {
      }
      virtual result_type invoke() const
         { return (this_->*methode_)(); }
   private:
      mutable this_type* this_;
      methode_type methode_;
   };
template <class T, class M, class R, class Arg0, class Arg1, class Arg2>
   struct general_methode1
      : delegue_base1 <R, Arg0>
   {
      using this_type = T;
      using methode_type = M;
      explicit general_methode1(this_type* t, methode_type m)
         : this_{t}, methode_{m}
      {
      }
      virtual result_type invoke(argument_type arg) const
         { return (this_->*methode_)(arg); }
   private:
      mutable this_type* this_;
      methode_type methode_;
   };
template <class T, class M, class R, class Arg0, class Arg1, class Arg2>
   struct general_methode2
      : delegue_base2 <R, Arg0, Arg1>
   {
      using this_type = T;
      using methode_type = M;
      explicit general_methode2(this_type* t, methode_type m)
         : this_{t}, methode_{m}
      {
      }
      virtual result_type invoke(first_argument_type arg0, second_argument_type arg1) const
         { return (this_->*methode_)(arg0, arg1); };
   private:
      mutable this_type* this_;
      methode_type methode_;
   };
template <class T, class M, class R, class Arg0, class Arg1, class Arg2>
   struct general_methode3
      : delegue_base3 <R, Arg0, Arg1, Arg2>
   {
      using this_type = T;
      using methode_type = M;
      explicit general_methode3(this_type* t, methode_type m)
         : this_{t}, methode_{m}
      {
      }
      virtual result_type invoke(first_argument_type arg0, second_argument_type arg1, third_argument_type arg2) const
         { return (this_->*methode_)(arg0, arg1, arg2); };
   private:
      mutable this_type* this_;
      methode_type methode_;
   };
template <class T, class M, class R, class T0, class T1, class T2>
   struct static_trouver_general_methode
   {
   private:
      using t0 = general_methode0<T, M, R, T0, T1, T2>;
      using t1 = general_methode1<T, M, R, T0, T1, T2>;
      using t2 = general_methode2<T, M, R, T0, T1, T2>;
      using t3 = general_methode3<T, M, R, T0, T1, T2>;
      static const int nb_valides = nb_params_valides <T0, T1, T2>::value;
   public:
      using type = typename
         static_switch4<nb_valides, t0, t1, t2, t3>::type;
   };
template <class R, class Arg0, class Arg1, class Arg2>
   struct delegue;
template <class R, class Arg0 = Vide, class Arg1 = Vide, class Arg2 = Vide>
   struct delegue
   {
      using result_type = R;
      using first_argument_type = Arg0;
      using second_argument_type = Arg1;
      using third_argument_type = Arg2;
      static const int nombre_argument_valide =
         nb_params_valides<Arg0, Arg1, Arg2>::value;
      using delegue_base_type = typename
         static_trouver_delegue_base<R, Arg0, Arg1, Arg2>::type;
      // La Sainte-Trinité s'applique
      explicit delegue() noexcept : delegue_{} {}
      template <class F>
         explicit delegue(F fct)
            : delegue_{} 
         {
            using delegue_function_type = typename
               static_trouver_general_fonction<F, R, Arg0, Arg1, Arg2>::type;
            delegue_ = new typename delegue_function_type(fct);
         }
      template <class T, class M>
         explicit delegue(T* type, M methode)
            : delegue_{}
         {
            using delegue_methode_type = typename
               static_trouver_general_methode<T, M, R, Arg0, Arg1, Arg2>::type;
            delegue_ = new typename delegue_methode_type(type, methode);
         }
      result_type operator()() const
      {
         verifier_delegue();
         return delegue_->invoke();
      }
      result_type operator()(first_argument_type arg) const
      {
         verifier_delegue();
         return delegue_->invoke(arg);
      }
      result_type operator()(first_argument_type arg0, second_argument_type arg1) const
      {
         verifier_delegue();
         return delegue_->invoke(arg0, arg1);
      }
      result_type operator()(first_argument_type arg0, second_argument_type arg1, third_argument_type arg2) const
      {
         verifier_delegue();
         return delegue_->invoke(arg0, arg1, arg2);
      }
      bool empty() const
         { return !delegue_; }
      bool is_from_the_same_base(const delegue& value) const
         { return value.delegue_ == delegue_; }
      bool operator==(const delegue&)
         const; // can't be implemented
      bool operator!=(const delegue&)
         const; // can't be implemented
      ~delegue() noexcept
         { delete delegue_; }
   private:
      void verifier_delegue() const
         { if(empty()) throw bad_function_call{}; }
   private:
      //std::shared_ptr<delegue_base_type> delegue_;
      delegue_base_type* delegue_;
   };
template <class R>
   delegue<R> creer_delegue(R (f)())
      { return delegue<R>(f); }
template <class R, class T0>
   delegue<R, T0> creer_delegue(R (f)(T0))
      { return delegue<R, T0>(f); }
template <class R, class T0, class T1>
   delegue<R, T0, T1> creer_delegue(R (f)(T0, T1))
      { return delegue<R, T0, T1>(f); }
template <class R, class T0, class T1, class T2>
   delegue<R, T0, T1, T2> creer_delegue(R (f)(T0, T1, T2))
      { return delegue<R, T0, T1, T2>(f); }
template <class R, class T>
   delegue<R> creer_delegue(T &x, R (T::*f)())
      { return delegue<R>(&x, f); }
template <class R, class T, class T0>
   delegue<R, T0> creer_delegue(T &x, R (T::*f)(T0))
      { return delegue<R, T0>(&x, f); }
template <class R, class T, class T0, class T1>
   delegue<R, T0, T1> creer_delegue(T &x, R (T::*f)(T0, T1))
      { return delegue<R, T0, T1>(&x, f); }
template <class R, class T, class T0, class T1, class T2>
   delegue<R, T0, T1, T2> creer_delegue(T &x, R (T::*f)(T0, T1, T2))
      { return delegue<R, T0, T1, T2>(&x, f); }
template <class R, class T>
   delegue<R> creer_delegue(T &x, R (T::*f)() const)
      { return delegue<R>(&x, f); }
template <class R, class T, class T0>
   delegue<R, T0> creer_delegue(T &x, R (T::*f)(T0) const)
      { return delegue<R, T0>(&x, f); }
template <class R, class T, class T0, class T1>
   delegue<R, T0, T1> creer_delegue(T &x, R (T::*f)(T0, T1) const)
      { return delegue<R, T0, T1>(&x, f); }
template <class R, class T, class T0, class T1, class T2>
   delegue<R, T0, T1, T2> creer_delegue(T &x, R (T::*f)(T0, T1, T2) const)
      { return delegue<R, T0, T1, T2>(&x, f); }
#endif

Voilà.

Depuis C++ 14

Le code qui précède est lourd mais fonctionne. Cependant, depuis C++ 14, le tout devient nettement plus simple. Je ne reprendrai pas ici  toutes les explications proposées plus haut, me limitant à expliquer les simplifications, disparitions, ajouts et autres ajustements.

Notez que j'ai évacué le volet « statistiques » pour cette version, dans le but de ne pas obscurcir le propos (si vous souhaitez le réintégrer, il est sensiblement comme celui des exemples précédents). J'ai strictement gardé le volet décrivant le temps écoulé en fonction de la charge de travail et des seuils de séparations des tâches.

La comptabilité sérielle du nombre de mots dans un fichier demeurera telle quelle, à ceci près que l'amusant type whatever devient redondant maintenant que nous avons des expressions λ génériques.

#ifndef COMPTER_MOTS_SERIEL
#define COMPTER_MOTS_SERIEL
#include <algorithm>
#include <numeric>
#include <string>
#include <iterator>
#include <fstream>
int compter_mots_document(const std::string &nom) {
   using namespace std;
   return count_if(
      istream_iterator<string>{ ifstream{ nom } },
      istream_iterator<string>{}, [](auto &&) { return true; }
   );
}
template <class It>
int compter_mots_multidocuments_seriel(It debut, It fin)
{
   using namespace std;
   return accumulate(debut, fin, 0, [](int so_far, const string &nom) {
      return so_far + compter_mots_document(nom);
   });
}
#endif

Le volet Map/ Reduce est simplifié du fait que nous avons recours à std:future et à std::async().

#ifndef COMPTER_MOTS_MAP_REDUCE_H
#define COMPTER_MOTS_MAP_REDUCE_H
#include <thread>
#include <future>
#include <iterator>
template <class It>
   int compter_mots_multidocuments_map_reduce
      (It debut, It fin, int seuil = std::thread::hardware_concurrency()) {
      using namespace std;
      auto n = distance(debut, fin);
      if (n < seuil)
         return compter_mots_multidocuments_seriel(debut, fin);
      auto f_ = async(compter_mots_multidocuments_map_reduce<It>, debut, next(debut, n / 2), seuil),
           g_ = async(compter_mots_multidocuments_map_reduce<It>, next(debut, n / 2), fin, seuil);
      return f_.get() + g_.get();
   }
#endif

J'ai écrit une version simplifiée de l'algorithme moyenne() mais acceptant une fonction de transformation des données cumulées.

La raison pour ce changement est que dans cette version, le code de test (plus bas) cumulera le temps non pas sur des double, mais bien sur des high_resolution_clock::duration, ce qui impliquera éventuellement d'aller chercher le nombre de tics cumulés (méthode count()) pour réaliser la division qu'implique ce calcul.

Dans les cas où aucune transformation du cumul n'est requise, le foncteur noop sera utilisé en lieu et place de cette transformation.

#ifndef MOYENNE_H
#define MOYENNE_H
#include <numeric>
#include <cassert>
#include <iterator>
template <class It, class C = typename std::iterator_traits<It>::value_type>
   auto somme(It debut, It fin, C init = {}) {
      return std::accumulate(debut, fin, init);
   }
auto noop = [](auto && arg) -> auto && {
   return std::forward<decltype(arg)>(arg);
};
template <class R, class It, class SumT>
   R moyenne(It debut, It fin, SumT sumT) {
      using namespace std;
      assert(debut != fin);
      return sumT(somme(debut, fin)) / static_cast<R>(distance(debut, fin));
   }
template <class R, class It>
   R moyenne(It debut, It fin) {
      return moyenne<R>(debut, fin, noop);
   }
#endif

Le programme de test suit. Remarquez d'office à quel point l'échafaudage a été simplifié dans cet exemple plus contemporain (vous pouvez comparer avec l'exemple précédent pour faire le constat vous-mêmes).

La fonction terminaison(), décorative, peut maintenant être résolue à la compilation, et il en va de même pour la fonction config().

// ...
// en-têtes maison, ci-dessus, omis pour fins de simplicité
// ...
#include <sstream>
#include <iostream>
#include <iterator>
#include <algorithm>
#include <string>
#include <chrono>
using namespace std;
using namespace std::chrono;
static constexpr const char *terminaison(bool pluriel) {
   return pluriel ? "s" : "";
}
static constexpr const char *config() {
#ifdef _DEBUG
   return "(DEBUG)";
#else
   return "(RELEASE)";
#endif
}

J'ai écrit une fonction générique de test, retournant une paire faire du résultat d'un calcul et du temps écoulé pour l'effectuer. Ceci remplace avantageusement la classe de minuterie des exemples précédents.

template <class F, class ... Args>
   auto tester(F f, Args && ... args) {
      auto avant = high_resolution_clock::now();
      auto res = f(std::forward<Args>(args)...);
      auto apres = high_resolution_clock::now();
      return make_pair(res, apres - avant);
   }

Les tests sériel et parallèle n'ont que peu changé, outre le fait qu'ils sont plus simples dû au mécanisme de test dont nous venons de discuter.

Les expressions λ sont un outil merveilleux pour simplifier l'écriture des programmes.

template <class It>
   auto tester_seriel(It debut, It fin, ostream &sortie, int n_essai) {
      sortie << "Approche sérielle, essai " << n_essai << endl;
      auto res = tester([debut, fin]() {
         return compter_mots_multidocuments_seriel(debut, fin);
      });
      sortie << '\t' << res.first << " mots au total\n"
             << '\t' < duration_cast<milliseconds>(res.second).count()
             << " ms" << endl;
      return res.second;
   }
template <class It>
   auto tester_parallele(It debut, It fin, ostream &sortie, int n_essai, int seuil) {
      sortie << "Approche parallele, essai " << n_essai
             << ", seuil sequentiel " << seuil << endl;
      auto res = tester([debut, fin, seuil]() {
         return compter_mots_multidocuments_map_reduce(debut, fin, seuil);
      });
      sortie << '\t' << res.first << " mots au total\n"
             << '\t' << duration_cast<milliseconds>(res.second).count()
             << " ms" << endl;
      return res.second;
   }

J'ai modifié le programme en soi sur quelques aspects :

  • Les temps sont mesurés en fonction des outils de chrono, plus rapides et plus rigoureux
  • Les seuils de séparation du travail sont plus pertinents
  • Par défaut, les tests parallèles utilisent un seuil qui utilise tous les coeurs (ici, je triche un peu en faisant semblant que main() n'est pas un thread pour que l'exemple demeure simple)
  • Le calcul de la moyenne est adapté pour tenir compte des changements décrits plus haut
int main(int argc, char *argv[]) {
   auto debut = &argv[1], fin = &argv[argc];
   ofstream sortie{ "sortie.txt" };
   const int NDOCS = argc - 1;
   enum { NTESTS = 10 };
   sortie << "Compter les mots, " << NDOCS
          << " document" << terminaison(argc > 2)
          << ", " << NTESTS << " test" << terminaison(NTESTS > 2)
          << ' ' << config() << "\n\t";
   copy(debut, fin, ostream_iterator<string>{sortie, "\n\t"});
   sortie << endl;
   int seuils[] { 64, 128, 256, 512, 1024 };
   high_resolution_clock::duration
      resultats_sequentiel[NTESTS] = { },
      resultats_parallele[NTESTS] = { },
      resultats_parallele_seuil[size(seuils)][NTESTS] = { { } };
   for (int i = 0; i < NTESTS; ++i) {
      resultats_sequentiel[i] = tester_seriel(debut, fin, sortie, i + 1);
      resultats_parallele[i] = tester_parallele(
         debut, fin, sortie, i + 1, distance(debut, fin) / thread::hardware_concurrency()
      );
      for (size_t j = 0; j != size(seuils); ++j)
         resultats_parallele_seuil[j][i] = tester_parallele(
            debut, fin, sortie, i + 1, seuils[j]
         );
      sortie << endl;
      cout << '.' << flush;
   }
   cout << endl;
   double moyennes[size(seuils)];
   for (size_t i = 0; i < size(seuils); ++i)
      moyennes[i] = moyenne<double>(
         begin(resultats_parallele_seuil[i]), end(resultats_parallele_seuil[i]),
         [](high_resolution_clock::duration t) {
            return static_cast<double>(t.count());
         }
      );
   sortie << "\n\nMeilleur temps moyen: ";
   double *p = min_element(begin(moyennes), end(moyennes));
   sortie << *p << " ms. avec un seuil sequentiel de " << static_cast<int>(p - moyennes + 1) << endl;
}

Voilà : une solution nettement plus simple, plus compréhensible, et probablement  plus efficace.


Valid XHTML 1.0 Transitional

CSS Valide !