Pipeline générique

Le pipeline est un schéma de conception bien connu dans le monde du parallélisme. Il s'exprime comme suit :

Un pipeline de transformations sera efficace s'il est plein, et si les diverses transformations sont de complexité semblable. Ainsi, si la cardinalité de l'ensemble d'échantillons à traiter est grande, alors :

Il est bien sûr possible de construire des pipelines ad hoc pour résoudre des problèmes de parallélisme spécifiques. Je vous propose ici une implémentation générique limitée à des transformations unaires (un paramètre, type de retour non-void), qu'il s'agisse de fonctions ou de foncteurs.

Certaines généralités sont présumées connues de votre part dans ce qui suit :

Vous trouverez ici deux versions du pipeline générique :

Sans surprises, la version C++ 17 est plus concise et plus simple. Si votre compilateur est à jour, c'est donc la version à privilégier.

Version C++ 17

La version qui suit est une implémentation d'un pipeline générique reposant sur les outils et idiomes propres à C++ 17. Pour une version se limitant aux outils et idiomes de C++ 03, voir plus bas.

Programme de test

Le programme suivant nous servira de programme de test pour un pipeline se voulant générique.

Principal.cpp

Pour les besoins de cet exemple, nous limiterons nos inclusions à des en-têtes standards, outre Pipeline.h lui-même qui nous donnera accès à la classe Pipeline située au coeur de nos préoccupations.

#include "Pipeline.h"
#include <type_traits>
#include <algorithm>
#include <iterator>
#include <memory>
#include <vector>
#include <string>
#include <string_view>
#include <locale>
#include <iterator>
#include <iostream>
using namespace std;

Nous appliquerons plusieurs opérations dans notre exemple de pipeline.

Ces opérations seront, pour les besoins de la cause, une combinaison de fonctions, de foncteurs et de λ.

L'une de ces opérations sera la fonction majuscules(), à droite, qui consommera une chaîne de caractères et retournera une chaîne équivalente mais dont les caractères sont tous des majuscules. Utiliser la même instance de locale tout au long du processus de transformation est une économie importante en termes de temps d'exécution (le constructeur d'un std::locale est une opération dispendieuse).

string majuscules(string s, locale loc = { "" }) {
   transform(begin(s), end(s), begin(s), [&](char c) {
      return toupper(c, loc);
   });
   return s;
}

Une autre de ces opérations sera la fonction inverser(), qui consommera une chaîne de caractères et retournera une chaîne équivalente mais dont les caractères apparaîtront dans l'ordre inverse de celui d'origine. Notez que j'ai utilisé l'opérateur , pour séparer les deux expressions que sont l'appel à std::reverse() et le retour de la chaîne s ainsi modifiée, résultant en une seule expression (composite) plutôt que deux.

string inverser(string s) {
   return reverse(begin(s), end(s)), s;
}

Le foncteur alterner_casse consomme chaque fois une chaîne de caractères, et produit en alternance une version en majuscules ou en minuscules de cette chaîne. Il serait possible de simplifier ce foncteur, mais le résultat serait probablement moins rapide à l'exécution que ne l'est l'original (celui présenté ici).

class alterner_casse {
   bool upper;
public:
   alterner_casse(bool upper = false)
      : upper{ upper }
   {
   }
   string operator()(string s, locale loc = { "" }) {
      if (upper)
         transform(begin(s), end(s), begin(s), [&](char c) {
            return toupper(c, loc);
         });
      else
         transform(begin(s), end(s), begin(s), [&](char c) {
            return tolower(c, loc);
         });
      upper = !upper;
      return s;
   }
};

La fonction dedoubler() consomme une chaîne de caractères et retourne une chaîne de caractères contenant deux occurrences congituës de la chaîne originale.

string dedoubler(const string &s) {
   return s + s;
}

Enfin, la fonction compter_lettres() retourne le nombre de caractères dans la chaîne de caractères passée en paramètre. Cette version est plus que banale, mais une variante simple prendrait un critère pred en paramètre et compterait chaque caractère c tel que pred(c).

string::size_type compter_lettres(string_view s) {
   return s.size();
}

Il nous reste à examiner le fonctionnement du pipeline en tant que tel, de même que le code de test que j'ai utilisé pour réaliser une validation sommaire de cette mécanique.

Pour alléger l'écriture de la création d'un pipeline à partir d'une séquence d'opérations, j'utiliserai la fonction générique make_pipeline() dont le premier paramètre sera un atomic<bool>& représentant le signal de fin de l'alimentation du pipeline et dont les paramètres subséquents seront les opérations à installer dans le pipeline, dans l'ordre (parce que les opérations peuvent ne pas être commutatives).

Le recours à un booléen atomique n'est pas anodin, du fait qu'un accès sans synchronisation ni atomicité à une variable de manière concurrente par plusieurs threads, incluant au moins l'un d'eux en écriture, mènerait à un comportement indéfini. Il faut aussi comprendre que, malgré son nom, cette variable ne signifie pas que les threads doivent se terminer dans l'immédiat, mais bien que le pipeline est informé que sa première étape ne sera plus alimentée. Conséquemment, lorsque celle-ci aura complété de traiter les données lui ayant été soumises, elle pourra signaler à la suivante que celle-ci ne sera plus alimentée non plus, et ainsi de suite.

template <class T, class ... Ops>
   Pipeline<T> make_pipeline(atomic<bool> &meurs, Ops && ... ops) {
      return Pipeline<T>(meurs, forward<Ops>(ops)...);
   }

Le code de test se présente comme suit :

  • la variable booléenne atomique meurs est initialisée à false, de manière à signaler qu'il n'est pas encore temps d'arrêter le traitement;
  • un pipeline nommé pipeline est créé à l'aide de make_pipeline(). Un certain nombre d'étapes y sont installées, la plupart sur des chaînes de caractères. Dans la version présentée à droite, le pipeline traite d'ailleurs des chaînes de caractères en entrée et produit des chaînes de caractères en sortie. Cependant, le pipeline fonctionne aussi sur d'autres types, ce que vous pouvez vérifier en retirant le commentaire autour de l'insertion dans le pipeline de compter_lettres et en remplaçant le typedef de result_type dans le programme actuel par celui présentement en commentaires, pour refléter ce changement;
  • un tableau de données entrante nommé donnees est ensuite défini, et servira à alimenter le pipeline. évidemment, ce très petit exemple ne permet pas de mettre en valeur la richesse de notre démarche;
  • ensuite, le signal de fin de l'alimentation du pipeline est donné, en déposant true dans meurs;
  • enfin, le programme de test cumule dans res accumule les fruits du calcul réalisé dans le pipeline, et ce jusqu'à ce que res ait autant d'éléments que n'en avait donnees, donc jusqu'à ce que chacune des données sources ait été traitée. Cette approche présume que le pipeline aura autant de données en sortie qu'il n'avait d'entrées, ce qui peut ne pas être véridique en général (en fonction des étapes du pipeline... Imaginez que l'une des étapes soit un filtre, par exemple), donc je vous invite à la prudence ici.

Pour le programme à droite, l'exécution mènera à l'affichage suivant :

m'jm'j
NMNM
!frp chc!frp chc

... ce qui, je pense, est conforme aux attentes.

int main() {
   atomic<bool> meurs {};
   auto loc = locale{""};
   auto pipeline = make_pipeline<string>(
      meurs, majuscules,
           inverser,
           alterner_casse(),
           [&](string s) -> string {
              auto fin = remove_if(begin(s), end(s), [&](char c) -> bool {
                 c = tolower(c, loc);
                 return c == 'a' || c == 'e' || c == 'i' || c == 'o' || c == 'u' || c == 'y';
              });
              return string(begin(s), fin);
           },
           dedoubler/*,
           compter_lettres*/
   );
   string donnees []  {
      "J'aime", "mon", "chic prof!"
   };
   pipeline.feed(begin(donnees), end(donnees));
   meurs = true;
   auto nadded = distance(begin(donnees), end(donnees));
   int nprocessed = {};
   using result_type = string;
   // using result_type = string::size_type;
   vector<result_type> res;
   for (auto v = pipeline.exiting<result_type>().extraire();
        res.insert(end(res), begin(v), end(v)), nprocessed += v.size(), nprocessed < nadded;
        v = pipeline.exiting<result_type>().extraire())
      ;
   for(const auto &val : res)
      cout << val << endl;
}

Classe zone_transit

L'idée d'une zone de transit pour assurer une communication symchronisée entre deux threads constitue une structure de données classique en multiprogrammation.

zone_transit.h

Dans ce cas bien précis, du fait que chaque zone de transit a un seul producteur et un seul consommateur, il aurait été possible d'utiliser une structure de données sans synchronisation. Cependant, j'y suis allé pour la simplicité avec une structure de données générique à l'interface simple, comme le montre l'exemple de code à droite.

La généricité est importante ici puisqu'il est possible, dans le pire cas, que chaque zone de transit d'un même pipeline opère sur un type distinct; pensez à un pipeline qui prendrait une liste de mots, la transformerait en une liste de catégories de mots (verbes, noms, déterminants, etc.) puis transformerait cette liste en paires {catégorie,nombre d'occurrences}.

Vous remarquerez que, du fait que mon implémentation insère en fin de conteneur et ne permet que l'extraction de tous les éléments d'un coup, j'utilise un vecteur comme substrat d'entreposage. Si vous souhaitez permettre d'extraire un élément à la fois, envisagez un std::deque.

Notez toutefois que des extractions de granularité trop fine peuvent mener à une synchronisation trop fréquente de la zone de transit, et ainsi à une dégradation des performances de votre conteneur.

Il est intéressant d'examiner brièvement la stratégie RAII de synchronisation des accès à l'attribut data. Voici comment notre implémentation procède :

  • les méthodes qui ont besoin de synchronisation sont celles qualifiées volatile. Cette qualification est une indication pour le compilateur de privilégier ces méthodes aux autres de même nom et de signature semblable lorsque la zone_transit est volatile, ce qui constitue la manière par laquelle la programmeuse ou le programmeur indique au compilateur que cet objet est susceptible d'être accédé concurremment par plusieurs threads;
  • le std::mutex nommé m_ est qualifié mutable, ce qui lui permet d'échapper à la qualification const, mais pas à la qualification volatile, ce qui signifie que les accès à cette variable doivent être encadrés par un const_cast;
  • le recours à un const_cast lors de chaque accès à m dans une méthode volatile est correct mais fastidieux;
  • pour alléger cette écriture, j'ai défini la méthode verrouiller(), elle-même const et volatile. Cette méthode verrouille m à l'aide d'un std::unique_lock appliqué sur un std::mutex. Un unique_lock est un objet RAII; il libérera le mutex en sa possession avant d'être détruit, garantissant que le mutex ainsi soit libéré, mais est aussi un objet déplaçable, ce qui permet à une fonction de verrouiller un mutex et de le retourner déjà verrouillé;
  • en agissant ainsi, l'écriture du code client devient simple : déclarer une variable (ici : verrou) en utilisant le « type » auto, pour alléger l'écriture, et lui affecter la valeur retournée par verrouiller(). Puisque verrouiller() encadre à la fois la saisie du mutex et la gestion des qualifications de sécurité, le code client (les méthodes qualifiées volatile) s'en trouve réduit au plus strict minimum, et ce sans compromis pour la sécurité. C'est ce que nous souhaitons.
#ifndef ZONE_TRANSIT_H
#define ZONE_TRANSIT_H
#include <vector>
#include <algorithm>
#include <memory>
#include <mutex>
template <class T, template <class, class> class C = std::vector, class A = std::allocator<T>>
   class zone_transit {
      C<T,A> data;
      mutable std::mutex m;
   public:
      std::unique_lock<std::mutex> verrouiller() const volatile {
         return std::unique_lock<std::mutex>(const_cast<std::mutex&>(m));
      }
      //
      // Interface rapide mais non-sécurisée
      //
      void ajouter(const T &elem) {
         data.push_back(elem);
      }
      template <class It>
         void ajouter(It debut, It fin) {
            using std::end;
            data_.insert(end(data), debut, fin);
         }
      C<T,A> extraire() {
         C<T,A> temp;
         temp.swap(data);
         return temp;
      }
      bool empty() const {
         return data.empty();
      }
      //
      // Interface sécurisée mais lente
      //
      void ajouter(const T &elem) volatile {
         auto verrou = verrouiller();
         const_cast<zone_transit&>(*this).ajouter(elem);
      }
      template <class It>
         void ajouter(It debut, It fin) volatile {
            auto verrou = verrouiller();
            const_cast<zone_transit&>(*this).ajouter(debut, fin);
         }
      C<T,A> extraire() volatile {
         auto verrou = verrouiller();
         return const_cast<zone_transit&>(*this).extraire();
      }
      bool empty() const volatile {
         auto verrou = verrouiller();
         return const_cast<const zone_transit&>.empty();
      }
   };
#endif

La classe générique Pipeline<T>

Il nous reste à voir comment fonctionne le pipeline en soi, et comment s'exprime la mécanique qui le sous-tend.

Pipeline.h

Ce pipeline reposera strictement sur des outils standards et d'autres construits à partir d'outils standards et appliquant des idiomes de programmation et des schémas de conception connus. C'est l'une des qualités de cette version : elle est pleinement portable, n'ayant aucune dépendance envers quelque plateforme ou système d'exploitation spécifique que ce soit.

#ifndef PIPELINE_H
#define PIPELINE_H
#include "zone_transit.h"
#include "memory_ext.h"
#include <vector>
#include <memory>
#include <type_traits>
#include <cassert>
#include <atomic>
#include <functional>
#include <thread>

L'éventuel arrêt de l'exécution du pipeline sera provoqué par une cascade d'événements :

  • Le signal par le fournisseur du pipeline que les données entrantes sont épuisées
  • Le constat par la première étape du pipeline qu'elle ne recevra plus de données et que ce qu'elle avait à traiter est épuisé
  • Le constat par la deuxième étape du pipeline qu'elle ne recevra plus de données et que ce qu'elle avait à traiter est épuisé, etc.

Lorsque deux étapes et interagissent, alimente , donc peut signaler à que ce dernier ne sera plus alimenté, alors que doit pouvoir lire ce signal sans toutefois être en mesure de le modifier. Pour représenter ceci, manipulera un std::atomic<bool> mais exposera à un read_only_signal sur ce booléen atomique. Il serait évidemment simple de généraliser cette pratique à d'autres types que bool.

 

class read_only_signal {
   std::atomic<bool> &signal;
public:
   read_only_signal(std::atomic<bool> &signal)
      : signal{ signal }
   {
   }
   operator bool() const {
      return signal;
   }
};

La classe Pipeline<T> est générique pour une raison technique intéressante :

  • tout pipeline instancié à partir de ce modèle prendra en paramètre une suite d'opérations unaires (un seul intrant) dont nous ne connaissons de prime abord ni le type du paramètre, ni le type de retour;
  • certaines de ces fonctions peuvent être des foncteurs génériques, de complexité arbitraire (par exemple des objets représentant une composition de fonctions), ce qui fait qu'il n'est pas possible de prime abord de connaître les types impliqués dans le pipeline sans savoir le type du premier paramètre;
  • de plus, notre implémentation utilise un canal (une zone de transit) entre chaque étape du pipeline, pour permettre à l'une des étapes d'être plus rapide qu'une autre sans causer de problèmes particuliers (outre celui de voir certains canaux accumuler des données en grande quantité, ce qui peut coûter cher en termes de ressources);
  • ainsi, une exigence de notre implémentation sera de demander au code client de fixer le type du premier paramètre, ce qui permet de fixer par transitivité le type de retour de chacune des étapes, et de cette manière le type des canaux entre chacune des étapes et à la toute fin du pipeline (le « type du pipeline », pour qui prendra le pipeline comme une sorte de fonction unaire).
template <class T>
   class Pipeline {
   public:

L'insertion d'un canal de communication entre les étapes dépend du type des données qui transitent à travers le pipeline. Si les opérations du pipeline sont , et si est le type du paramètre passé à , alors le type passé à sera le type du résultat de appliqué à un , le type du paramètre passé à sera le type du résultat de appliqué à un , etc.

Cela dit, les canaux qui servent de transit entre les étapes du pipeline doivent avoir un type commun, du moins si nous souhaitons placer les données sortantes d'une étape destinées à devenir les données entrantes d'une étape quelque part pour éviter le bloquer l'une en attendant que l'autre soit prête.

Le problème bien sûr est que chacun des canaux est une zone_transit<U> pour un type de donnée (soit ), alors qu'un conteneur en C++ est générique sur la base du type de données qu'il contient (p. ex. : un vector<int> contient des int). Pour placer tous les canaux dans un même conteneur, nous procédons ici à une forme ciblée d'effacement de type :

  • Un Canal n'est pas une classe générique (à ceci près qu'il soit interne à Pipeline<T> qui, lui, est générique)
  • Un Canal encapsule un unique_ptr<transit>
  • La classe transit est définie sans avoir d'autre fonctionnalité qu'un destructeur virtuel, ceci dans le but de faciliter l'éventuel nettoyage des données
  • La classe transit a autant d'enfants transit_impl<U> qu'il y a de types de données à faire transiter
  • Puisque tous les transit_impl<U> sont en fait des transit, il est possible de les entreposer dans un conteneur de unique_ptr<transit>
  • Le « problème » est de savoir quoi y entreposer (quel type) et, de la même manière, quoi en tirer (quel type). Heureusement, puisqu'un Canal se situe entre deux opérations et , où est le fournisseur et est le consommateur, il est clair que ces deux homologues sont en mesure de déterminer le type pour accéder au transit_impl<U> placé entre eux, et il est possible de réaliser ce transtypage de manière sécuritaire (par design) et efficace (en escamotant des tests dynamiques rendus caduques par construction).
      class Canal {
         struct transit {
            virtual ~transit() = default;
         };
         template <class U>
            struct transit_impl : transit {
               volatile zone_transit<U> zone;
            };
         std::unique_ptr<transit> p_;
         Canal() = default;
      public:
         Canal(const Canal&) = delete;
         Canal& operator=(const Canal&) = delete;
         template <class U>
            static std::shared_ptr<Canal> creer() {
               using namespace std;
               //
               // On opère par valeur
               //
               using type = typename
                  remove_reference<
                     typename remove_cv<U>::type
                  >::type;
               shared_ptr<Canal> p { new Canal };
               p->p_ = make_unique<transit_impl<type>>();
               return p;
            }
         template <class U>
            volatile zone_transit<U> &zone() {
               return static_cast<transit_impl<U>&>(*p_).zone;
            }
      };

Un peu comme les canaux placés entre les étapes sont encapsulés par la classe Canal, où chaque classe transit_impl<U> est abstraite par un parent transit réduit à l'extrême, les étapes sont représentées par la classe Tache, non-générique, dont :

  • La méthode start() lance un thread sur sa méthode abstraite executer()
  • Le destructeur attend la fin du thread en question, et
  • Qui encapsule les accès aux canaux entrant et sortant

Le réel travail des étapes prises une à une est réalisé par des instances de chacune des classes TacheImpl<F,A> (plus bas), mais c'est leur parent commun Tache qui assure l'encadrement de leur travail.

Quelques remarques à propos de la méthode Tache::start() :

  • Elle passe en paramètre completed_, un atomic<bool>&, plutôt que d'en faire un attribut protégé. Ceci réduit quelque peu la surface d'exposition de cet attribut, en restreignant l'accès « évident » à la méthode executer() des enfants de Tache
  • Elle lance un thread sur la méthode executer() en question, et
  • Elle se détache du thread ainsi lancé, pour que son exécution se poursuive même après le destructeur de th. Ceci signifie que d'autres mécanismes doivent être mis en place pour constater l'éventuel arrêt du thread – ici, une lecture de l'attribut completed_, que chaque enfant est (sémantiquement) tenu de mettre à true sur complétion de son exécution
      class Tache {
      public:
         using canal_type = std::shared_ptr<Canal>;
      private:
         canal_type in_, out_;
         read_only_signal die;
         std::atomic<bool> completed {};
      public:
         Tache(read_only_signal die) : die{ die } {
         }
         void connect(canal_type in, canal_type out) {
            in_ = in;
            out_ = out;
         }
         bool should_die() const {
            return die;
         }
         virtual ~Tache() { // hum...
            while (!completed_) std::this_thread::yield();
         }
         void start() {
            std::thread th([&]() {
               executer(completed);
            });
            th.detach();
         }
         read_only_signal completed_signal()
            { return completed; }
      protected:
         virtual void executer(std::atomic<bool>&) = 0;
         canal_type &in() { return in_; }
         canal_type &out() { return out_; }
      };

Chaque étape plus spécifique est représentée par une instance de TacheImpl<F,A> où :

  • A est le type des données entrantes (le type du « paramètre » de l'opération au coeur de l'étape); et
  • F est le type de l'opération qui sera appliquée à chaque A.

Toutes les instances de TacheImpl<F,A> reçoivent à la construction un read_only_signal, qui deviendra vrai seulement quand leur fournisseur affirmera qu'il ne leur livrera plus de nouvelles données, de même qu'un F qui, à l'interne, sera nommé f_.

Le réel rôle d'une TacheImpl<F,A> est décrit par sa méthode executer(), c'est-à-dire :

  • Consommer un A du canal entrant
  • Appliquer f_ sur ce A, et
  • Déposer le résultat dans le canal sortant

Lorsqu'il sera certain que le TacheImpl<F,A> ne recevra plus de nouvelles données et lorsque toutes les données en cours de traitement auront été traitées, le signal completed (reçu en paramètre) sera donné pour que l'étape suivante dans le pipeline soit informée qu'elle ne sera désormais plus alimentée.

   private:
      template <class F, class A>
         struct TacheImpl : Tache {
            F f_;
            TacheImpl(F f, read_only_signal die)
               : Tache{die}, f_{f}
            {
            }
            void executer(std::atomic<bool> &completed) {
               using argument_type = A;
               using result_type = decltype(f_(std::declval<A>()));
               using namespace std;
               auto &entree = in()->zone<argument_type>();
               auto &sortie = out()->zone<result_type>();
               while (!should_die() || !entree.empty()) {
                  auto to_process = entree.extraire();
                  if (!to_process.empty()) {
                     vector<result_type> res;
                     res.reserve(to_process.size());
                     transform(begin(to_process), end(to_process), back_inserter(res), [&](argument_type arg) {
                        return f_(arg);
                     });
                     sortie.ajouter(begin(res), end(res));
                  }
               }
               completed = true;
            }
         };

Pour des raisons pratiques, les canaux seront placés dans des pointeurs intelligents avec sémantique de partage (des shared_ptr), alors que les étapes seront placées dans des pointeurs intelligent avec responsabilité unique sur le pointé (des unique_ptr).

// ...
   public:
      using tache_ptr_type = std::unique_ptr<Tache>;
   private:
      std::vector<tache_ptr_type> tache_;
      std::vector<std::shared_ptr<Canal>> canal_;
   public:
      using size_type = typename
         std::vector<tache_ptr_type>::size_type;

Ajouter une étape fct au pipeline se fait comme suit :

  • le pipeline obtient une référence sur le dernier canal inséré jusqu'ici;
  • une instance de TacheImpl appropriée est ajoutée au pipeline;
  • un canal du type résultant de l'action de fct sur son paramètre est ajouté à la liste des canaux;
  • la TacheImpl tout juste insérée est connectée à ses canaux entrant et sortant; et
  • le signal de fin de l'étape nouvellement insérée est retournée, pour être éventuellement passée à la prochaine étape.
// ...
   private:
      template <class A, class F>
         read_only_signal add_step(F && fct, read_only_signal die)
         {
            auto in = canal_.back();
            tache_.push_back(
               make_unique<TacheImpl<F, A>>(std::forward<F>(fct), die)
            );
            add_channel<decltype(fct(std::declval<A>()))>();
            auto out = canal_.back();
            tache_.back()->connect(in, out);
            return tache_.back()->completed_signal();
         }

Ajouter un canal d'un certain type est banal dans la mesure où l'on a compris comment fonctionne la mécanique des canaux.

      template <class U>
         void add_channel() {
            canal_.push_back(Canal::creer<U>());
         }

Insérer une étape t0 de type T0 au pipeline, d'une manière telle que t0 s'applique à une instance d'un certain type U, est une tâche simple dans le contexte de la mécanique mise en place jusqu'ici. La valeur retournée est un signal de fin d'alimentation destiné à la prochaine étape du pipeline.

      template <class U, class T0>
         read_only_signal build_chaining(
            read_only_signal die, T0 &&t0
         )
         {
            return add_step<U>(std::forward<T0>(t0), die);
         }

La généralisation de l'insertion d'étapes à un pipeline implique ajouter la première étape du lot, déterminer le type de ce que cette étape produira, et insérer les étapes subséquentes. La subtilité du traitement dans ce cas tient au relais des signaux de complétion d'une étape à l'autre.

// ...
      template <class U, class T0, class ... Rest>
         read_only_signal build_chaining(
            read_only_signal, T0 &&t0, Rest &&... rest
         )
         {
            using std::forward;
            auto completed_next = build_chaining<U>(die, forward<T0>(t0));
            using type = decltype(t0(std::declval<U>()));
            return build_chaining<type>(completed_next, forward<Rest>(rest)...);
         }

Démarrer le pipeline, à partir de sa méthode start(), signifie lancer chacune des étapes du pipeline pour que celles-ci opèrent toutes en parallèle.

      void start() {
         for(auto & p : tache_)
            p->start();
      }

Construire un pipeline à partir d'un signal de complétion du fournisseur et d'une séquence d'étapes implique construire un canal pour les données entrantes et construire la chaîne d'étapes subséquentes.

   public:
      template <class... Ops>
         Pipeline(read_only_signal die, Ops && ... ops) {
            add_channel<T>();
            build_chaining<T>(die, std::forward<Ops>(ops)...);
         }

Accéder aux canaux entrant et sortant du pipeline sont deux opérations banales pour qui connaît les types impliqués.

      template <class U>
         volatile zone_transit<U> &entering() {
            return canal_.front()->zone<U>();
         }
      template <class U>
         volatile zone_transit<U> &exiting() {
            return canal_.back()->zone<U>();
         }

Enfin, nourrir le pipeline avec de nouvelles données entrantes implique démarrer l'exécution dudit pipeline. Le recours à un std::once_flag permet d'assurer que le code de démarrage du pipeline ne sera exécuté qu'une seule fois par programme.

      template <class U>
         void feed(U && elem) {
            entering<U>().ajouter(std::forward<U>(elem));
            std::call_once(fanion_, [&] { start(); });
         }
   private:
      std::once_flag fanion_;
   public:
      template <class It>
         void feed(It debut, It fin) {
            using value_type = typename
               std::iterator_traits<It>::value_type;
            entering<value_type>().ajouter(debut, fin);
            std::call_once(fanion_, [&] { start(); });
         }
   };
#endif

Version C++ 03

Ce qui suit détaille les éléments clés d'une implémentation d'un pipeline générique avec C++ 03. Je vous recommande de n'y avoir recours que si votre compilateur n'est pas à jour, car le tout est significativement plus fastidieux et plus lourd que la version C++ 17 proposée un peu plus haut.

Programme de test

Principal.cpp

Débutons les explications par une présentation de ce qui sera le programme de test. Ceci nous permettra de voir comment il serait possible, en pratique, d'utiliser le pipeline avec des opérations quelconques.

Les contraintes d'utilisation détaillées de cette implémentation du schéma de conception pipeline sont décrites plus bas (fichiers Pipeline.h et Pipeline.cpp). Pour le moment, nous limiterons nos explications à ceci :

  • une instance de la classe Pipeline sera instanciée avec une série d'opérations ;
  • ces opérations doivent être telles que, pour , le type de ce que retourne puisse être converti dans le type de ce que reçoit en paramètre l'opération ;
  • le canal entrant d'une instance de Pipeline permet au code client d'alimenter l'opération ;
  • le canal sortant d'une instance de Pipeline de opérations permet au code client de consommer le résultat de l'opération .
#include "Pipeline.h"
#include <type_traits>
#include <algorithm>
#include <iterator>
#include <memory>
#include <vector>
#include <string>
#include <locale>
#include <iterator>
#include <iostream>
using namespace std;

Notre programme de test consistera en un pipeline de quatre étapes, opérant sur des chaînes de caractères standard (des std::string; j'écrirai simplement string ci-dessous par souci de simplicité). Notez que les types des opérations pourraient être plus variés, ceci n'était qu'un exemple.

Les étapes de ce pipeline seront constituées d'un mélange de fonctions et de foncteurs unaires :

  • la fonction majuscules(), qui prend en paramètre une string et retourne une string équivalente à ceci près que tous ses caractères alphabétiques sont transformés en équivalentes majuscules;
string majuscules(const string &s)
{
   string res;
   res.reserve(s.size());
   locale loc("");
   transform(
      begin(s), end(s), back_inserter(res),
      [&loc](const char c) {
         return toupper(c, loc);
      }
   );
   return std::move(res);
}
  • la fonction inverser(), qui prend en paramètre une string et retourne une string équivalente à ceci près que les caractères y apparaissent dans l'ordre inverse de ceux de la chaîne d'origine;
string inverser(string s)
{
   reverse(begin(s), end(s));
   return std::move(s);
}
  • le foncteur alterner_casse, qui prend en paramètre une string et retourne une std::string équivalente à ceci près que tous ses caractères sont majuscules ou minuscules (en alternance); et
class alterner_casse
{
   bool upper_;
public:
   typedef string argument_type;
   typedef string result_type;
   alterner_casse(bool upper = false)
      : upper_(upper)
   {
   }
   string operator()(const string &s)
   {
      string res;
      res.reserve(s.size());
      locale loc("");
      if (upper_)
         transform(
            begin(s), end(s), back_inserter(res),
            [&loc](const char c) {
               return toupper(c, loc);
            }
         );
      else
         transform(
            begin(s), end(s), back_inserter(res),
            [&loc](const char c) {
              return tolower(c, loc);
            }
         );
      upper_ = !upper_;
      return std::move(res);
   }
};
  • la fonction dedoubler(), qui prend en paramètre une string et retourne une string contenant la concaténation du contenu de la chaîne d'origine avec elle-même.
string dedoubler(const string &s)
   { return s + s; }

Le programme lui-même va comme suit :

  • un pipeline de quatre opérations est construit;
  • un échantillon de données est choisi (ici, un vulgaire tableau de string);
  • le pipeline est nourri de cet échantillon (méthode feed(), à laquelle je passe un intervalle à demi ouvert);
  • le programme de test calcule combien d'échantillons ont été fournis au pipeline, et collecte les résultats de ses traitements jusqu'à ce que tous les échantillons aient été traités;
  • enfin, les résultats sont affichés à la console.

Ici, notez que l'affichage final aurait avantageusement pu s'exprimer par

for_each(begin(res), end(res), [&](const string &s) {
   cout << s << endl;
});

mais mon compilateur peine parfois avec les λ-expressions et plantait à la compilation, ce qui explique le recours à une répétitive for classique.

int main()
{
   Pipeline pipeline(
      majuscules, inverser, alterner_casse(), dedoubler
   );
   string donnees [] =
   {
      "J'aime", "mon", "chic prof!"
   };
   pipeline.feed(begin(donnees), end(donnees));
   int nadded = distance(begin(donnees), end(donnees)),
       nprocessed = 0;
   vector<string> res;
   for (auto v = pipeline.exiting<string>().extraire();
        res.insert(end(res), begin(v), end(v)),
           nprocessed += v.size(),
           nprocessed < nadded;
        v = pipeline.exiting<string>().extraire())
      ;
   for(auto itt = begin(res); itt != end(res); ++itt)
      cout << *itt << endl;
}

Le résultat de l'exécution de ce programme de test sera :

emia'jemia'j
NOMNOM
!forp cihc!forp cihc

Le code client (le programme de test) ne peut se fier sur le fait que la file sortante du pipeline soit vide pour déterminer que le traitement a été complété, du fait que les diverses opérations du pipeline peuvent varier en termes de vitesse et sont faites de manière asynchrone. Un canal de sortie vide peut simplement signifier que certaines des opérations sont plus longues que prévu.

Notez aussi que rien n'empêche un pipeline de contenir des opérations de filtrage, de telle sorte que le nombre d'échantillons sortant pourrait être inférieur au nombre d'échantillons sortants. De même, rien n'empêche une opération du pipeline de consommer un échantillon et d'en produire plusieurs en sortie. Ainsi, le code client – responsable d'avoir construit le pipeline – doit prévoir des mécanismes pour évaluer si le pipeline a terminé son travail, du moins s'il est important pour lui de le savoir.

Ici, toutes les opérations prennent une chaîne en paramètre et retournent une chaîne, donc le nombre d'échantillons en entrée sera égal au nombre d'échantillons en sortie une fois le traitement terminé.

Complément au standard – memory_ext.h

Le standard C++ 11 est une mise à jour importante du langage, mais malgré beaucoup de soin et d'attention, certains détails ont échappé aux membres du comité de standardisation.

L'un d'entre eux, tout simple, est qu'il existe un std::make_shared() pour instancier de manière sécuritaire et efficace un std::shared_ptr (pointeur intelligent avec sémantique de partage) mais qu'il n'existe pas de std::make_unique() pour faire l'équivalent avec un std::unique_ptr (pointeur intelligent avec sémantique de responsabilité exclusive).

memory_ext.h

Cela dit, rien ne nous empêche d'en écrire un nous-mêmes, ce que j'ai fait ici pour les versions d'arité zéro et un (celles dont j'avais besoin).

Si vous le souhaitez, vous pouvez évidemment ajouter des versions à deux paramètres ou plus, mais ne perdez pas trop de temps là-dessus du fait qu'une solution générale sera possible lorsque les compilateurs supporteront de manière plus étendue les templates variadiques.

#ifndef MEMORY_EXT_H
#define MEMORY_EXT_H
#include <memory>
template <class T>
   std::unique_ptr<T> make_unique()
      { return std::unique_ptr<T>(new T); }
template <class T, class A>
   std::unique_ptr<T> make_unique(A arg)
      { return std::unique_ptr<T>(new T(arg)); }
#endif

Notez que std::unique_ptr<T> est covariant sur la base du type T. Il est en effet possible d'écrire ce qui suit :

template <class T, class D>
   std::unique_ptr<T> creer()
   {
      return make_unique<D>();
   }

si D est une dérivé de T. Cette caractéristique est importante dans notre code à l'intérieur de cet article.

Pour une explication de la pertinence d'une fonction telle que make_unique<T>(), voir cet article de Herb Sutter.

Vous remarquerez dans le code plus bas que j'utilise des std::shared_ptr mais que je ne me sers pas de std::make_shared<T>() pour les générer. La raison est que le recours à std::make_shared<T>() force, pour des raisons techniques, le type T à exposer un constructeur public, or il s'avère que pour les classes en fonction desquelles j'utilise des std::shared_ptr dans le présent article, j'ai préféré conserver les constructeurs privés, ou garder privées les classes à construire et à partager.

Trait lockable

Sur ce site, des exemples de Mutex et verrous RAII sont disponibles, et offerts entre autres en version portable. Évidemment, C++ 11 offre de chic mécanismes de multiprogrammation, mais le compilateur que j'ai utilisé pour écrire le code présenté ici ne les supporte pas encore, donc les techniques « maison » ont dû être appliquées.

lockable.h

Dans le but d'étendre un peu les techniques susmentionnées, j'ai défini un trait nommé lockable_traits, applicable à un outil de synchronisation (par exemple un Mutex). Il sera donc possible de déduire d'un type d'outil de synchronisation le type de verrou à lui appliquer.

Évidemment, c'est une façon de faire, pas la façon. Procéder ainsi permet d'introduire de nouveaux outils de synchronisation dans un programme et de publier implicitement les mécanismes RAII pour s'en servir, mais cela réduit un peu la flexibilité du code client dans le cas où plusieurs variantes de synchronisation pourraient être rendues disponibles (verrou réentrant, verrou testable et autres).

#ifndef LOCKABLE_H
#define LOCKABLE_H
template <class>
   struct lockable_traits;
#endif

Un exemple concret de lockable_traits est donné dans Mutex.h, plus bas.

Traits de fonctions unaires

Avec C++ 11 vient une standardisation de traits sur les types, à travers la bibliothèque <type_traits>, ce qui remplace avantageusement plusieurs techniques classiques de métaprogrammation. Ces traits standards seront utilisés à quelques endroits dans le code présenté dans cet article.

unary_function_traits.h

Dans le cas de l'implémentation d'un pipeline proposée ici, j'ai surtout besoin de savoir deux choses sur les opérations à appliquer, soit le type de paramètre attendu par une opération et le type de la valeur retournée par cette opération.

En effet, un pipeline de opérations aura la forme suivante :

ce qui, pris séquentiellement, ressemblerait à :

où chaque est une zone de transit pour les échantillons et chaque est une transformation. Le code client détermine les , mais le pipeline doit valider que les intrants d'une opération peuvent être construits à partir des extrants de l'opération précédente. De même, le pipeline doit mettre en place les zones de transit pour que ceux-ci soient du type retourné par .

Pour cette raison, les traits argument_of et result_of seront utilisés et permettront de raisonner sur les types impliqués.

#ifndef UNARY_FUNCTION_TRAITS_H
#define UNARY_FUNCTION_TRAITS_H
template <class F>
   struct result_of
   {
      typedef typename
         F::result_type type;
   };
template <class R, class A>
   struct result_of<R (*)(A)>
   {
      typedef R type;
   };
template <class F>
   struct argument_of
   {
      typedef typename
         F::argument_type type;
   };
template <class R, class A>
   struct argument_of<R (*)(A)>
   {
      typedef A type;
   };
#endif

Le code d'argument_of déduit le type du paramètre d'une fonction unaire de sa signature et suppose qu'un foncteur unaire aura défini le type interne et public argument_type. Le code de result_of fait de même mais suppose plutôt d'un foncteur (unaire ou non) la présence d'un type interne et public result_type. Pour les fins du pipeline, seules les opérations unaires seront considérées.

Classe Mutex

J'utiliserai ici un Mutex portable au sens du code client (mais dont l'implémentation sous-jacente est spécifique à pour une plateforme donnée) exploitant sur les idiomes incopiable et pImpl. Bien qu'incopiable, le Mutex sera déplaçable, implémentant la sémantique de mouvement.

Mutex.h

Vous pouvez vous référer la l'article sur les Mutex portables pour des détails quant aux bases de l'implémentation proposée ici.

Notez que le verrou RAII pour les Mutex proposés ici est accessible à travers une spécialisation du trait lockable_traits applicable au type Mutex. J'ai fait un verrou simple ici, mais il aurait été facile d'ajouter plusieurs saveurs outre locker.

Pour que la technique soit pertinente, il faut bien sûr que les lockable_traits définis pour d'autres types d'outils de synchronisation expriment leurs outils privilégiées sous le nom locker et sous la même sémantique d'utilisation que celle proposée ici.

#ifndef MUTEX_H
#define MUTEX_H
#include "Incopiable.h"
#include <memory>
class Mutex
   : Incopiable
{
   class Impl;
   std::unique_ptr<Impl> p_;
   void swap(Mutex &);
   Mutex &raw() const volatile
      { return const_cast<Mutex&>(*this); }
public:
   Mutex();
   ~Mutex() noexcept;
   Mutex(Mutex &&);
   Mutex& operator=(Mutex &&);
   void obtenir() const volatile;
   void relacher() const volatile;
};
#include "lockable.h"
template <>
   struct lockable_traits<Mutex>
   {
      class locker
         : Incopiable
      {
         const volatile Mutex &mutex_;
      public:
         locker(const volatile Mutex &mutex)
            : mutex_(mutex)
         {
            mutex_.obtenir();
         }
         ~locker() noexcept
            { mutex_.relacher(); }
      };
   };
#endif

Notez que la qualification Incopiable aurait pu être omise ici du fait qu'au moins un des attributs d'un Mutex, le std::unique_ptr<Impl>, et lui-même incopiable (mais déplaçable).

Mutex.cpp

L'implémentation sous-jacente du Mutex est somme toute simple, et repose principalement sur de la délégation des services de la strate d'utilisation (Mutex) vers ceux de la strate d'implémentation (Mutex::Impl).

Notez que unique_ptr ne supporte pas vraiment la qualification volatile, qu'affiche fièrement la classe Mutex à travers ses services. Pour cette raison, une méthode privée raw() libérant this des qualifications const et volatile est utilisée pour fins de délégation des services de Mutex vers ceux de Mutex::Impl.

#include "Mutex.h"
#include <Windows.h>
class Mutex::Impl
{
   HANDLE h;
public:
   Impl()
      : h(CreateMutex(0,FALSE,0))
   {
   }
   ~Impl() noexcept
      { CloseHandle(h); }
   void obtenir() const volatile noexcept
      { WaitForSingleObject(h, INFINITE); }
   void relacher() const volatile noexcept
      { ReleaseMutex(h); }
};
void Mutex::swap(Mutex &m)
   { std::swap(p_, m.p_); }
Mutex::Mutex()
   : p_(new Impl)
{
}
Mutex::~Mutex() noexcept
   { }
Mutex::Mutex(Mutex &&m)
   : p_(std::move(m.p_))
{
   m.p_ = nullptr;
}
Mutex& Mutex::operator=(Mutex &&m)
{
   swap(m);
   return *this;
}
void Mutex::obtenir() const volatile
   { raw().p_->obtenir(); }
void Mutex::relacher() const volatile
   { raw().p_->relacher(); }

Classe once_signal

J'ai construit mon implémentation de pipeline de manière à ce qu'un Pipeline donné ne puisse être démarré qu'une seule fois – il est toutefois possible de démarrer plusieurs pipelines, ce qui implique que redémarrer un pipeline donné puisse être simulé par la destruction du pipeline puis par la reconstruction d'un autre pipeline avec la même séquence d'opérations que celle prise en charge par le pipeline original.

once_signal.h

J'ai implémenté le concept d'un signal ne pouvant être donné qu'une seule fois à l'aide d'un booléen (en pratique, il devrait s'agir d'un atomic<bool>) qui est initialisé à faux et ne peut devenir que vrai – son opération set() est idempotente.

Notez que cette classe n'est pas exempte de conditions de course pour un code client qui choisirait de faire quelque chose comme :

once_signal sig;
// ...
if(!sig.test())
{
   sig.set();
   f();
}

du fait que deux threads pourraient appeler sig.test() concurremment et se trouver à appeler f() tous deux, même s'il est probable dans un tel cas que f() contienne du code qui ne devrait appliquer une seule fois. Il importe donc d'appuyer l'utilisation d'un once_signal avec d'autres mécanismes (ce que ne n'ai pas fait ici, par pure paresse).

#ifndef ONCE_SIGNAL_H
#define ONCE_SIGNAL_H
#include "Incopiable.h"
class once_signal
   : Incopiable
{
public:
   typedef bool value_type;
private:
   value_type val_;
public:
   once_signal()
      : val_(false)
   {
   }
   void set()
      { val_ = true; }
   value_type test() const noexcept
      { return val_; }
};
#endif

Pourquoi ai-je écrit cette classe? Parce que mon pipeline exposait un grand nombre de constructeurs (pour des raisons que j'expliquerai plus bas) et que je ne voulais pas oublier, dans l'un ou l'autre d'entre eux, d'initialiser mon indicateur de pipeline démarré ou non correctement. En utilisant le type once_signal plutôt qu'un booléen, le problème potentiel s'est réglé de manière implicite.

Classe zone_transit

L'idée d'une zone de transit pour assurer une communication symchronisée entre deux threads constitue une structure de données classique en multiprogrammation.

zone_transit.h

Dans ce cas bien précis, du fait que chaque zone de transit a un seul producteur et un seul consommateur, il aurait été possible d'utiliser une structure de données sans synchronisation. Cependant, j'y suis allé pour la simplicité avec une structure de données générique à l'interface simple, comme le montre l'exemple de code à droite.

La généricité est importante ici puisqu'il est possible, dans le pire cas, que chaque zone de transit d'un même pipeline opère sur un type distinct; pensez à un pipeline qui prendrait une liste de mots, la transformerait en une liste de catégories de mots (verbes, noms, déterminants, etc.) puis transformerait cette liste en paires {catégorie,nombre d'occurrences}.

Vous remarquerez que, du fait que mon implémentation insère en fin de conteneur et ne permet que l'extraction de tous les éléments d'un coup, j'utilise un vecteur comme substrat d'entreposage. Si vous souhaitez permettre d'extraire un élément à la fois, envisagez un std::deque.

Notez toutefois que des extractions de granularité trop fine peuvent mener à une synchronisation trop fréquente de la zone de transit, et ainsi à une dégradation des performances de votre conteneur.

#ifndef ZONE_TRANSIT_H
#define ZONE_TRANSIT_H
#include "Mutex.h"
#include "lockable.h"
#include <vector>
#include <algorithm>
#include <memory>
template <class T, template <class, class> class C = std::vector, class A = std::allocator<T>>
   class zone_transit
   {
      C<T,A> data_;
      Mutex m_;
      typedef typename
         lockable_traits<Mutex>::locker lock_type;
   public:
      void ajouter(const T &elem)
      {
         data_.push_back(elem);
      }
      template <class It>
         void ajouter(It debut, It fin)
         {
            using std::end;
            data_.insert(end(data_), debut, fin);
         }
      C<T,A> extraire()
      {
         auto temp = data_;
         data_.clear();
         return temp;
      }
      void ajouter(const T &elem) volatile
      {
         lock_type lock(m_);
         const_cast<zone_transit&>(*this).ajouter(elem);
      }
      template <class It>
         void ajouter(It debut, It fin) volatile
         {
            lock_type lock(m_);
            const_cast<zone_transit&>(*this).ajouter(debut, fin);
         }
      C<T,A> extraire() volatile
      {
         lock_type lock(m_);
         return const_cast<zone_transit&>(*this).extraire();
      }
   };
#endif

Classe Pipeline

Nous arrivons maintenant à l'implémentation du pipeline en tant que telle. J'ai choisi d'y aller d'une classe concrète, Pipeline, qui dissimulera les validations de correspondance de types et la gestion des zones de transit « sous la couverture », de sorte que le code client en sera simplifié.

Pipeline.h

Une instance de Pipeline sera essentiellement constituée d'une liste de tâches (des objets encapsulant chacune une transformation) et de canaux (des zones de transit).

Le premier problème auquel nous ferons face sera celui des types impliqués. En effet, supposons un pipeline de la forme ; les canaux sont les alors que les transformations sont les . Sachant cela, il n'est pas clair que les types de , et soient les mêmes, tout comme il n'est pas clair que les types de et soient les mêmes.

Pourtant, il nous faut les entreposer dans un même conteneur, et les conteneurs de C++ sont typés. Il faut donc placer dans tache_ des éléments de même type (le type Pipeline::Tache), tout comme il faut placer dans canal_ des éléments de même type (le type Pipeline::Canal).

Nous verrons plus bas, en examinant les classes Tache et Canal, comment nous y arriverons.

#ifndef PIPELINE_H
#define PIPELINE_H
#include "once_signal.h"
#include "zone_transit.h"
#include "unary_function_traits.h"
#include "memory_ext.h"
#include <vector>
#include <memory>
#include <type_traits>
class Pipeline
{
public:
   class Tache;
   class Canal;
private:
   std::vector<std::unique_ptr<Tache>> tache_;
   std::vector<std::shared_ptr<Canal>> canal_;
   once_signal started_;
   void start();

Les méthodes add_step() et add_channel() ont pour rôle d'aider à construire le pipeline.

Dans le cas de add_step(), qui prend une opération du type générique F en paramètre, on parle d'ajouter une transformation à la liste de celles prises en charge par le pipeline. Ce paramètre est important du fait que si F est un pointeur de fonction, alors il faut savoir l'adresse de la fonction à exécuter, alors que si F est un foncteur, il est possible que celui-ci ait des états à tenir à jour.

Dans le cas de add_channel(), aucun paramètre n'est requis mais le type T des éléments à placer dans la zone de transit doit être indiqué à l'appel. C'est le pipeline lui-même qui instanciera une zone_transit<T> vide lors d'un appel de cette méthode.

Ces services sont qualifiés privés du fait que la cohérence entre les canaux et les tâches, en nombre ou en termes de types impliqués, est essentielle au bon fonctionnement du pipeline.

public:
   typedef std::vector<
      std::unique_ptr<Tache>
   >::size_type size_type;
   ~Pipeline() noexcept;
private:
   template <class F>
      void add_step(F);
   template <class T>
      void add_channel();

Pour faciliter l'écriture du code, j'ai rédigé un service de classe générique validate_chaining(F0,F1) qui ne compilera que s'il est possible d'enchaîner une transformation de type F0 par une transformation de type F1.

Ce raisonnement se base sur les types impliqués : il faut que la sortie de F0 puisse être convertie implicitement dans l'entrée de F1.

   template <class F0, class F1>
      static void validate_chaining(F0, F1)
      {
         using std::is_convertible;
         static_assert(
            is_convertible<
               typename result_of<F0>::type,
               typename argument_of<F1>::type
            >::value, "Chaining problem"
         );
      }

Par exemple si F0 est string (*)(const char*) et si F1 est double (*)(string), alors validate_chaining(F0,F1) compilera car la sortie de F0, une string, peut être passée en entrée à F1.

En retour, si F0 est double (*)(string) alors que F1 est string (*)(const char*), alors validate_chaining(F0,F1) ne compilera pas, car la sortie de F0, un double, ne peut être implicitement convertie en l'entrée de F1, un const char*.

Maintenant, examinons le code par lequel se construit l'enchaînement de transformations, soit l'ensemble de méthodes build_chaining().

Faute d'avoir un compilateur supportant les templates variadiques, j'ai écrit des versions de cette méthode allant jusqu'à concurrence de dix transformations, en construisant un enchaînement de transformations à partir d'un enchaînement de transformations. Avec des templates variadiques, dont vous trouverez un exemple ici, ce code fondra comme neige au soleil.

   template <class T0>
      void build_chaining(T0 t0)
         { add_step(t0); }
   template <class T0, class T1>
      void build_chaining(T0 t0, T1 t1)
      {
         validate_chaining(t0, t1);
         add_step(t0);
         build_chaining(t1);
      }
   template <class T0, class T1, class T2>
      void build_chaining(T0 t0, T1 t1, T2 t2)
      {
         validate_chaining(t0, t1);
         add_step(t0);
         build_chaining(t1, t2);
      }
   template <class T0, class T1, class T2, class T3>
      void build_chaining(
         T0 t0, T1 t1, T2 t2, T3 t3
      )
      {
         validate_chaining(t0, t1);
         add_step(t0);
         build_chaining(t1, t2, t3);
      }
   template <class T0, class T1, class T2, class T3, class T4>
      void build_chaining(
         T0 t0, T1 t1, T2 t2, T3 t3, T4 t4
      )
      {
         validate_chaining(t0, t1);
         add_step(t0);
         build_chaining(t1, t2, t3, t4);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5>
      void build_chaining(
         T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
         T5 t5
      )
      {
         validate_chaining(t0, t1);
         add_step(t0);
         build_chaining(t1, t2, t3, t4, t5);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5, class T6>
      void build_chaining(
         T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
         T5 t5, T6 t6
      )
      {
         validate_chaining(t0, t1);
         add_step(t0);
         build_chaining(t1, t2, t3, t4, t5, t6);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5, class T6, class T7>
      void build_chaining(
         T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
         T5 t5, T6 t6, T7 t7
      )
      {
         validate_chaining(t0, t1);
         add_step(t0);
         build_chaining(t1, t2, t3, t4, t5, t6, t7);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5, class T6, class T7, class T8>
      void build_chaining(
         T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
         T5 t5, T6 t6, T7 t7, T8 t8
      )
      {
         validate_chaining(t0, t1);
         add_step(t0);
         build_chaining(t1, t2, t3, t4, t5, t6, t7, t8);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5, class T6, class T7, class T8, class T9>
      void build_chaining(
         T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
         T5 t5, T6 t6, T7 t7, T8 t8, T9 t9
      )
      {
         validate_chaining(t0, t1);
         add_step(t0);
         build_chaining(t1, t2, t3, t4, t5, t6, t7, t8, t9);
      }

Tout comme les méthodes build_chaining() présentées plus haut, les constructeurs supportent jusqu'à dix transformations (mais on pourrait en ajouter plus au besoin). Évidemment, une solution basée sur les templates variadiques serait plus souple et plus élégante.

Un constructeur de Pipeline met en place le canal d'entrée puis construit l'enchaînement des transformations. Suite à un constructeur de Pipeline de transformations, le nombre de canaux sera donc .

public:
   template <class T0>
      Pipeline(T0 t0)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0);
      }
   template <class T0, class T1>
      Pipeline(T0 t0, T1 t1)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0,t1);
      }
   template <class T0, class T1, class T2>
      Pipeline(T0 t0, T1 t1, T2 t2)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0,t1,t2);
      }
   template <class T0, class T1, class T2, class T3>
      Pipeline(T0 t0, T1 t1, T2 t2, T3 t3)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0,t1,t2,t3);
      }
   template <class T0, class T1, class T2, class T3, class T4>
      Pipeline(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0,t1,t2,t3,t4);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5>
      Pipeline(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
               T5 t5)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0,t1,t2,t3,t4,t5);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5, class T6>
      Pipeline(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
               T5 t5, T6 t6)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0,t1,t2,t3,t4,t5,t6);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5, class T6, class T7>
      Pipeline(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
               T5 t5, T6 t6, T7 t7)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0,t1,t2,t3,t4,t5,t6,t7);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5, class T6, class T7, class T8>
      Pipeline(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
               T5 t5, T6 t6, T7 t7, T8 t8)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0,t1,t2,t3,t4,t5,t6,t7,t8);
      }
   template <class T0, class T1, class T2, class T3, class T4,
             class T5, class T6, class T7, class T8, class T9>
      Pipeline(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4,
               T5 t5, T6 t6, T7 t7, T8 t8, T9 t9)
      {
         add_channel<typename argument_of<T0>::type>();
         build_chaining(t0,t1,t2,t3,t4,t5,t6,t7,t8,t9);
      }

Les méthodes entering() et exiting() sont typées sur la base du type de données dans la zone de transit entrante et sortante (respectivement) du pipeline.

Leur implémentation est expliquée un peu plus bas. Évidemment, leur signature implique que le code client connaisse le type des données entrantes, et qu'il connaisse aussi le type résultant de la dernière transformation du pipeline.

Ces méthodes n'ont aucun sens sur un Pipeline vide, mais cet état n'est pas possible à travers nos constructeurs.

   template <class T>
      volatile zone_transit<T> &entering();
   template <class T>
      volatile zone_transit<T> &exiting();

Alimenter le pipeline correspond à ajouter des éléments dans la zone de transit menant vers la première transformation (techniquement : ajouter à pour alimenter ).

La classe Pipeline proposée ici offre deux services en ce sens, soit feed(elem) pour ajouter un seul élément et feed(debut,fin) pour ajouter d'un coup tous les éléments d'un intervalle à demi ouvert. Évidemment, la version opérant sur un intervalle est beaucoup plus rapide que celle n'opérant que sur un seul élément, les coûts associés à la synchronisation de la zone de transit se trouvant amortis sur l'ensemble des éléments ajoutés.

   template <class T>
      void feed(T elem)
      {
         if (!started_.test()) start();
         entering<T>().ajouter(elem);
      }
   template <class It>
      void feed(It debut, It fin)
      {
         typedef typename
            std::iterator_traits<It>::value_type value_type;
         if (!started_.test()) start();
         entering<value_type>().ajouter(debut, fin);
      }
};

Il aurait été plus rapide en général de séparer le démarrage (appel à la méthode start() et tests sur started_) de l'alimentation. J'ai choisi de joindre les deux dans le but d'alléger le code client, sans plus : un Pipeline alimenté au moins une fois sera implicitement démarré.

L'abstraction représentant une transformation dans une instance de Pipeline sera la classe interne Tache. Toutefois, Tache elle-même sera surtout une façade devant des transformations spécifiques.

Remarquez la fonction globale amie prendre_en_charge(). C'est cette fonction qui, appelée par un thread de la plateforme, transformera une abstraction pure en une tâche et lui permettra de s'exécuter.

La technique appliquée ici ressemble un peu à celle de l'idiome pImpl ou de la classe peu_importe; si vous souhaitez explorer un peu plus par vous-même, voilà des pistes pour vous aider à démarrer votre réflexion.

class Pipeline::Tache
{
   friend void prendre_en_charge(void *);

Dans une Tache se trouvent deux niveaux de classes internes.

Le premier niveau, Executable, représente les détails propres à la gestion d'un thread :

  • type interne Rep (détaillé dans Pipeline.cpp);
  • méthode start() pour démarrer la tâche dans le pipeline; et
  • membre die_ pour signaler une demande d'arrêt).

On y trouve aussi les canaux entrants et sortants (in_ et out_) pris sur une base abstraite (non-typée), et un service polymorphique executer() pour lancer à proprement dit l'exécution de cette partie du pipeline.

   class Executable
   {
      bool die_;
      struct Rep;
      std::unique_ptr<Rep> rep_;
      std::shared_ptr<Pipeline::Canal> in_, out_;
   public:
      std::shared_ptr<Pipeline::Canal> in()
         { return in_; }
      std::shared_ptr<Pipeline::Canal> out()
         { return out_; }
      void connect(
         std::shared_ptr<Pipeline::Canal> in,
         std::shared_ptr<Pipeline::Canal> out
      )
      {
         in_ = in;
         out_ = out;
      }
      bool should_die() const noexcept
         { return die_; }
      void die()
         { die_ = true; }
      Executable();
      virtual void executer() = 0;
      virtual ~Executable() noexcept;
      void start();
   };

Le deuxième niveau de classe interne, ExecutableImpl<F>, est sans doute le plus important. À tout le moins, c'est le plus actif, connaissant l'opération unaire à appliquer dans sa strate du pipeline (f_, de type F).

Sa méthode executer() réalise le traitement d'une étape du pipeline à proprement dit :

  • elle identifie les types entrant et sortant (sans qualifications; nous opérons par valeur dans le pipeline, par souci de simplicité);
  • elle obtient les canaux entrant et sortant, de manière typée, ce qu'elle peut faire puisqu'elle connaît les types impliqués;
  • par la suite, elle itère tant que le pipeline est actif, en consommant des données en entrée, en leur appliquant la transformation f_ et en déposant les résultats dans le canal de sortie.
   template <class F>
      struct ExecutableImpl
         : Executable
      {
         F f_;
         void executer()
         {
            using std::vector;
            using std::transform;
            using std::back_inserter;
            using std::begin;
            using std::end;
            using std::remove_cv;
            using std::remove_reference;
            typedef typename
               remove_reference<
                  typename remove_cv<
                     typename result_of<F>::type
                  >::type
               >::type result_type;
            typedef typename
               remove_reference<
                  typename remove_cv<
                     typename argument_of<F>::type
                  >::type
               >::type argument_type;
            auto &entree = in()->zone<argument_type>();
            auto &sortie = out()->zone<result_type>();
            while (!should_die())
            {
               auto to_process = entree.extraire();
               if (!to_process.empty())
               {
                  vector<result_type> res;
                  res.reserve(to_process.size());
                  transform(
                     begin(to_process), end(to_process),
                     back_inserter(res), [&](argument_type arg) {
                        return f_(arg);
                     }
                  );
                  sortie.ajouter(begin(res), end(res));
               }
            }
         }
         ExecutableImpl(F f)
            : f_(f)
         {
         }
      };

Ne reste plus qu'à parler du type Pipeline::Tache en tant que tel.

En fait, puisque ce type n'est pas générique (et ne peut l'être, servant de type général pour les tâches dans le pipeline), il ne peut pas seul traiter les divers cas possibles de transformations susceptibles de contribuer au pipeline dans son ensemble.

Pour cette raison, il garde un pointeur sur un Executable, offre une interface simple pour le démarrer et l'exécuter, et permet de lui indiquer quels canaux (abstraits) utiliser. Le type Executable est polymorphique mais n'est pas générique; en retour, les enfants d'Executable, instances de ExecutableImpl<F> pour divers types F, sont génériques et connaissent précisément les types impliqués, ce qui leur permet de réaliser une prise en charge typée des opérations.

   std::unique_ptr<Executable> p_;
public:
   template <class F>
      Tache(F f)
         : p_(new ExecutableImpl<F>(f))
      {
      }
   void start()
      { p_->start(); }
   void executer()
      { p_->executer(); }
   void connect(
      std::shared_ptr<Pipeline::Canal> in,
      std::shared_ptr<Pipeline::Canal> out
   )
   {
      p_->connect(in, out);
   }
};

L'approche mise en place pour Pipeline::Canal est similaire à cette appliquée dans le cas de Pipeline::Tache.

L'abstraction de haut niveau d'un Canal est transit, type qui n'a de polymorphique que le destructeur. Sa seule raison d'être est de déposer n'importe quelle zone_transit<T> dans un Canal.

Pour ne pas forcer une intrusion dans zone_transit<T>, ce qui serait nécessaire si nous voulions lui imposer Pipeline::Canal::transit comme parent, nous utilisons plutôt un enfant typé, transit_impl<T>, qui n'a pour seul membre qu'une zone_transit<T>. Une forme de combinaison d'héritage et de composition externes pour remplacer un héritage interne intrusif.

Le reste n'est que mécanique. Un Canal contient un Canal::transit dont dérivent divers Canal::transit_impl<T>. Le code client pourra transtyper efficacement un Canal::transit en Canal::transit_impl<T> s'il connaît le type T.

class Pipeline::Canal
   : Incopiable
{
   struct transit { virtual ~transit() noexcept {} };
   template <class T>
      struct transit_impl
         : transit
      {
         volatile zone_transit<T> zone;
      };
   std::unique_ptr<transit> p_;
   Canal()
      { }
public:
   template <class T>
      static std::shared_ptr<Canal> creer()
      {
         using std::remove_cv;
         using std::remove_reference;
         typedef typename
            remove_reference<
               typename remove_cv<T>::type
            >::type type;
         using std::shared_ptr;
         using std::unique_ptr;
         shared_ptr<Canal> p(new Canal);
         p->p_ = make_unique<transit_impl<type>>();
         return std::move(p);
      }
   template <class T>
      volatile zone_transit<T> &zone()
         { return static_cast<transit_impl<T>&>(*p_).zone; }
};

Reste à voir comment sont implémentés les quelques services restants de Pipeline. Certains, génériques, sont détaillés dans Pipeline.h, alors que les autres apparaissent dans Pipeline.cpp.

La méthode add_step(F) procède en quatre temps :

  • elle s'assure que le pipeline n'est pas encore démarré pour que son comportement demeure cohérent;
  • elle capture le dernier canal créé jusqu'ici, qui sera la zone de transit entrante de la nouvelle tâche;
  • elle ajoute la nouvelle étape au pipeline, en instanciant une TacheImpl<F>; puis
  • elle ajoute un nouveau canal au pipeline, typé sur le type retourné par F. Ce canal sera la zone de transit sortante pour la nouvelle tâche.
#include <cassert>
template <class F>
   void Pipeline::add_step(F fct)
   {
      using std::unique_ptr;
      assert(!started_.test());
      auto in = canal_.back();
      tache_.push_back(
         make_unique<Tache>(fct)
      );
      add_channel<typename result_of<F>::type>();
      auto out = canal_.back();
      tache_.back()->connect(in, out);
   }

L'ajout d'un Pipeline::Canal, réalisé à travers la méthode add_channel(), est une opération typée mais sans paramètre. C'est au pipeline lui-même de créer les zones de transit, pas au code client. Ceci contraste avec les opérations (add_step(F), plus haut) qui, elles, sont la responsabilité du code client – le pipeline, lui, est responsable de leur organisation.

template <class T>
   void Pipeline::add_channel()
   {
      assert(!started_.test());
      canal_.push_back(Canal::creer<T>());
   }

Enfin, des services permettant au code client d'obtenir les zones de transit entrante et sortante du pipeline sont offertes. Notez que le code client doit au moins connaître les types des zones de transit demandées pour être en mesure de solliciter ces services correctement.

template <class T>
   volatile zone_transit<T> &Pipeline::entering()
      { return canal_.front()->zone<T>(); }
template <class T>
   volatile zone_transit<T> &Pipeline::exiting()
      { return canal_.back()->zone<T>(); }
#endif

Examinons maintenant les services associés à un Pipeline mais qui sont définis à même un fichiers source distinct.

Pipeline.cpp

Dans le but de dissocier un peu plus la déclaration du Pipeline (dans Pipeline.h) d'une plateforme sous-jacente ou de l'autre, j'ai préféré déclarer une fonction générale, prendre_en_charge(void*), sans affiliation particulière avec une plateforme ou l'autre, amie de Tache plutôt que de faire de même avec une fonction dont la signature est associée de près avec la plateforme (execute_task(void*) dans ce cas-ci). C'est une petite touche, et ça ne coûte rien, mais c'est utile dans une optique d'entretien du code.

Le recours à l'amitié ici est dû au fait qu'un accès à Executable est requis pour exécuter effectivement une étape du pipeline, or cette classe est privée dans Tache.

#include "Pipeline.h"
#include "memory_ext.h"
#include <cassert>
#include <algorithm>
#include <memory>
#include <windows.h>
using std::begin;
using std::end;
using std::for_each;
using std::unique_ptr;
void prendre_en_charge(void *p)
{
   auto tache = static_cast<Pipeline::Tache::Executable*>(p);
   tache->executer();
}
unsigned long __stdcall execute_task(void *p)
{
   prendre_en_charge(p);
   return {};
}

Le destructeur de Pipeline est vide, mais nécessaire. La raison est subtile :

  • dans la déclaration de Pipeline, nous manipulons des conteneurs de types a priori incomplets, tels que Pipeline::Tache et Pipeline::Canal;
  • ces entités sont manipulées à travers des pointeurs intelligents (std::unique_ptr);
  • si le destructeur de Pipeline n'était pas déclaré, alors le compilateur chercherait à définir automatiquement le code du destructeur implicite de Pipeline dès Pipeline.h, or il n'est pas possible de le faire à cet endroit dû aux pointeurs intelligents sur des types incomplets (leurs destructeurs chercheraient à appeler les destructeurs de leurs pointés, dont on ne sait pas encore à ce stade s'ils sont accessibles);
  • en déclarant le destructeur de Pipeline dans Pipeline.h mais en ne le définissant que dans Pipeline.cpp, le compilateur doit attendre avant de générer le code de Pipeline::~Pipeline() et ce code sera alors correct.
Pipeline::~Pipeline() noexcept
   { }

La méthode start() démarre chaque étape du pipeline, dans l'ordre.

Notez qu'il y a condition de course ici dans la phase entre started_.test() et started_.set(). Je l'ai laissée là parce qu'elle ne dérangeait pas vraiment mon programme de test quelque peu banal, mais en pratique il faudrat s'en occuper. Comment règleriez-vous le problème?

void Pipeline::start()
{
   assert(!started_.test());
   started_.set();
   for_each(
      begin(tache_), end(tache_),
      [&](unique_ptr<Tache> &p) {
         p->start();
      }
   );
}

La classe Rep, interne à Tache::Executable (oui, on parle de quatre niveaux de classes imbriquées ici) sert de représentation simplifiée pour un thread de la plateforme sous-jacente.

Ses responsabilités sont limitées : permettre d'attendre la fin du thread, entreposer sa représentation, démarrer un thread pour un Executable donné, et faire un peu de nettoyage.

Normalement, j'aurais fait en sorte qu'une classe comme Rep s'assure, dans son destructeur, que le signal de fin soit envoyé au thread et qu'une attente de la fin effective de son exécution soit faite. Ici, par contre, du fait que je n'entrepose par un pointeur sur l'Executable associé à un thread dans Rep, j'ai choisi de laisser cette responsabilité à d'autres.

struct Pipeline::Tache::Executable::Rep
{
   HANDLE h;
   Rep()
      : h(INVALID_HANDLE_VALUE)
   {
   }
   void wait()
      { WaitForSingleObject(h, INFINITE); }
   void start(Pipeline::Tache::Executable *p)
   {
      h = CreateThread(0,0,execute_task,p,CREATE_SUSPENDED,0);
      assert(h != INVALID_HANDLE_VALUE);
      ResumeThread(h);
   }
   ~Rep() noexcept
      { CloseHandle(h); }
};

Le constructeur par défaut d'Executable est défini ailleurs que le lieu de sa déclaration, pour les mêmes raisons que celles invoquées pour le destructeur de Pipeline, plus haut.

Notez que rep_ n'a pas à être explicitement initialisé à nullptr du fait que le constructeur par défaut d'un pointeur intelligent fait déjà ce travail.

Pipeline::Tache::Executable::Executable()
   : die_(false)
{
}

Le destructeur d'Executable s'assure d'émettre au thread qui lui est associé un signal de fin et d'attendre la fin effective du thread avant de mourir lui-même (et d'entraîner la représentation rep_ avec lui). On ne veut pas de threads sauvages s'amusant avec les ressources du système, après tout.

Pipeline::Tache::Executable::~Executable() noexcept
{
   if (rep_)
   {
      die();
      rep_->wait();
   }
}

Enfin, le démarrage d'un Executable implique la construction d'une représentation (un Rep) et l'invocation de son service start() appliqué à l'Executable lui-même.

void Pipeline::Tache::Executable::start()
{
   assert(!rep_);
   rep_ = make_unique<Rep>();
   rep_->start(this);
}

En espérant que le tout vous soit utile...


Valid XHTML 1.0 Transitional

CSS Valide !