Le pipeline est un schéma de conception bien connu dans le monde du parallélisme. Il s'exprime comme suit :
Un pipeline de transformations sera efficace s'il est plein, et si les diverses transformations sont de complexité semblable. Ainsi, si la cardinalité de l'ensemble d'échantillons à traiter est grande, alors :
Il est bien sûr possible de construire des pipelines ad hoc pour résoudre des problèmes de parallélisme spécifiques. Je vous propose ici une implémentation générique limitée à des transformations unaires (un paramètre, type de retour non-void), qu'il s'agisse de fonctions ou de foncteurs.
Certaines généralités sont présumées connues de votre part dans ce qui suit :
Vous trouverez ici deux versions du pipeline générique :
Sans surprises, la version C++ 17 est plus concise et plus simple. Si votre compilateur est à jour, c'est donc la version à privilégier.
La version qui suit est une implémentation d'un pipeline générique reposant sur les outils et idiomes propres à C++ 17. Pour une version se limitant aux outils et idiomes de C++ 03, voir plus bas.
Le programme suivant nous servira de programme de test pour un pipeline se voulant générique.
Principal.cpp | |
---|---|
Pour les besoins de cet exemple, nous limiterons nos inclusions à des en-têtes standards, outre Pipeline.h lui-même qui nous donnera accès à la classe Pipeline située au coeur de nos préoccupations. |
|
Nous appliquerons plusieurs opérations dans notre exemple de pipeline.
Ces opérations seront, pour les besoins de la cause, une combinaison de fonctions, de foncteurs et de λ. L'une de ces opérations sera la fonction majuscules(), à droite, qui consommera une chaîne de caractères et retournera une chaîne équivalente mais dont les caractères sont tous des majuscules. Utiliser la même instance de locale tout au long du processus de transformation est une économie importante en termes de temps d'exécution (le constructeur d'un std::locale est une opération dispendieuse). |
|
Une autre de ces opérations sera la fonction inverser(), qui consommera une chaîne de caractères et retournera une chaîne équivalente mais dont les caractères apparaîtront dans l'ordre inverse de celui d'origine. Notez que j'ai utilisé l'opérateur , pour séparer les deux expressions que sont l'appel à std::reverse() et le retour de la chaîne s ainsi modifiée, résultant en une seule expression (composite) plutôt que deux. |
|
Le foncteur alterner_casse consomme chaque fois une chaîne de caractères, et produit en alternance une version en majuscules ou en minuscules de cette chaîne. Il serait possible de simplifier ce foncteur, mais le résultat serait probablement moins rapide à l'exécution que ne l'est l'original (celui présenté ici). |
|
La fonction dedoubler() consomme une chaîne de caractères et retourne une chaîne de caractères contenant deux occurrences congituës de la chaîne originale. |
|
Enfin, la fonction compter_lettres() retourne le nombre de caractères dans la chaîne de caractères passée en paramètre. Cette version est plus que banale, mais une variante simple prendrait un critère pred en paramètre et compterait chaque caractère c tel que pred(c). |
|
Il nous reste à examiner le fonctionnement du pipeline en tant que tel, de même que le code de test que j'ai utilisé pour réaliser une validation sommaire de cette mécanique.
Pour alléger l'écriture de la création d'un pipeline à partir d'une séquence d'opérations, j'utiliserai la fonction générique make_pipeline() dont le premier paramètre sera un atomic<bool>& représentant le signal de fin de l'alimentation du pipeline et dont les paramètres subséquents seront les opérations à installer dans le pipeline, dans l'ordre (parce que les opérations peuvent ne pas être commutatives). Le recours à un booléen atomique n'est pas anodin, du fait qu'un accès sans synchronisation ni atomicité à une variable de manière concurrente par plusieurs threads, incluant au moins l'un d'eux en écriture, mènerait à un comportement indéfini. Il faut aussi comprendre que, malgré son nom, cette variable ne signifie pas que les threads doivent se terminer dans l'immédiat, mais bien que le pipeline est informé que sa première étape ne sera plus alimentée. Conséquemment, lorsque celle-ci aura complété de traiter les données lui ayant été soumises, elle pourra signaler à la suivante que celle-ci ne sera plus alimentée non plus, et ainsi de suite. |
|
Le code de test se présente comme suit :
Pour le programme à droite, l'exécution mènera à l'affichage suivant :
... ce qui, je pense, est conforme aux attentes. |
|
L'idée d'une zone de transit pour assurer une communication symchronisée entre deux threads constitue une structure de données classique en multiprogrammation.
zone_transit.h | |
---|---|
Dans ce cas bien précis, du fait que chaque zone de transit a un seul producteur et un seul consommateur, il aurait été possible d'utiliser une structure de données sans synchronisation. Cependant, j'y suis allé pour la simplicité avec une structure de données générique à l'interface simple, comme le montre l'exemple de code à droite. La généricité est importante ici puisqu'il est possible, dans le pire cas, que chaque zone de transit d'un même pipeline opère sur un type distinct; pensez à un pipeline qui prendrait une liste de mots, la transformerait en une liste de catégories de mots (verbes, noms, déterminants, etc.) puis transformerait cette liste en paires {catégorie,nombre d'occurrences}. Vous remarquerez que, du fait que mon implémentation insère en fin de conteneur et ne permet que l'extraction de tous les éléments d'un coup, j'utilise un vecteur comme substrat d'entreposage. Si vous souhaitez permettre d'extraire un élément à la fois, envisagez un std::deque. Notez toutefois que des extractions de granularité trop fine peuvent mener à une synchronisation trop fréquente de la zone de transit, et ainsi à une dégradation des performances de votre conteneur. Il est intéressant d'examiner brièvement la stratégie RAII de synchronisation des accès à l'attribut data. Voici comment notre implémentation procède :
|
|
Il nous reste à voir comment fonctionne le pipeline en soi, et comment s'exprime la mécanique qui le sous-tend.
Pipeline.h | |
---|---|
Ce pipeline reposera strictement sur des outils standards et d'autres construits à partir d'outils standards et appliquant des idiomes de programmation et des schémas de conception connus. C'est l'une des qualités de cette version : elle est pleinement portable, n'ayant aucune dépendance envers quelque plateforme ou système d'exploitation spécifique que ce soit. |
|
L'éventuel arrêt de l'exécution du pipeline sera provoqué par une cascade d'événements :
Lorsque deux étapes et interagissent, alimente , donc peut signaler à que ce dernier ne sera plus alimenté, alors que doit pouvoir lire ce signal sans toutefois être en mesure de le modifier. Pour représenter ceci, manipulera un std::atomic<bool> mais exposera à un read_only_signal sur ce booléen atomique. Il serait évidemment simple de généraliser cette pratique à d'autres types que bool.
|
|
La classe Pipeline<T> est générique pour une raison technique intéressante :
|
|
L'insertion d'un canal de communication entre les étapes dépend du type des données qui transitent à travers le pipeline. Si les opérations du pipeline sont , et si est le type du paramètre passé à , alors le type passé à sera le type du résultat de appliqué à un , le type du paramètre passé à sera le type du résultat de appliqué à un , etc. Cela dit, les canaux qui servent de transit entre les étapes du pipeline doivent avoir un type commun, du moins si nous souhaitons placer les données sortantes d'une étape destinées à devenir les données entrantes d'une étape quelque part pour éviter le bloquer l'une en attendant que l'autre soit prête. Le problème bien sûr est que chacun des canaux est une zone_transit<U> pour un type de donnée (soit ), alors qu'un conteneur en C++ est générique sur la base du type de données qu'il contient (p. ex. : un vector<int> contient des int). Pour placer tous les canaux dans un même conteneur, nous procédons ici à une forme ciblée d'effacement de type :
|
|
Un peu comme les canaux placés entre les étapes sont encapsulés par la classe Canal, où chaque classe transit_impl<U> est abstraite par un parent transit réduit à l'extrême, les étapes sont représentées par la classe Tache, non-générique, dont :
Le réel travail des étapes prises une à une est réalisé par des instances de chacune des classes TacheImpl<F,A> (plus bas), mais c'est leur parent commun Tache qui assure l'encadrement de leur travail. Quelques remarques à propos de la méthode Tache::start() :
|
|
Chaque étape plus spécifique est représentée par une instance de TacheImpl<F,A> où :
Toutes les instances de TacheImpl<F,A> reçoivent à la construction un read_only_signal, qui deviendra vrai seulement quand leur fournisseur affirmera qu'il ne leur livrera plus de nouvelles données, de même qu'un F qui, à l'interne, sera nommé f_. Le réel rôle d'une TacheImpl<F,A> est décrit par sa méthode executer(), c'est-à-dire :
Lorsqu'il sera certain que le TacheImpl<F,A> ne recevra plus de nouvelles données et lorsque toutes les données en cours de traitement auront été traitées, le signal completed (reçu en paramètre) sera donné pour que l'étape suivante dans le pipeline soit informée qu'elle ne sera désormais plus alimentée. |
|
Pour des raisons pratiques, les canaux seront placés dans des pointeurs intelligents avec sémantique de partage (des shared_ptr), alors que les étapes seront placées dans des pointeurs intelligent avec responsabilité unique sur le pointé (des unique_ptr). |
|
Ajouter une étape fct au pipeline se fait comme suit :
|
|
Ajouter un canal d'un certain type est banal dans la mesure où l'on a compris comment fonctionne la mécanique des canaux. |
|
Insérer une étape t0 de type T0 au pipeline, d'une manière telle que t0 s'applique à une instance d'un certain type U, est une tâche simple dans le contexte de la mécanique mise en place jusqu'ici. La valeur retournée est un signal de fin d'alimentation destiné à la prochaine étape du pipeline. |
|
La généralisation de l'insertion d'étapes à un pipeline implique ajouter la première étape du lot, déterminer le type de ce que cette étape produira, et insérer les étapes subséquentes. La subtilité du traitement dans ce cas tient au relais des signaux de complétion d'une étape à l'autre. |
|
Démarrer le pipeline, à partir de sa méthode start(), signifie lancer chacune des étapes du pipeline pour que celles-ci opèrent toutes en parallèle. |
|
Construire un pipeline à partir d'un signal de complétion du fournisseur et d'une séquence d'étapes implique construire un canal pour les données entrantes et construire la chaîne d'étapes subséquentes. |
|
Accéder aux canaux entrant et sortant du pipeline sont deux opérations banales pour qui connaît les types impliqués. |
|
Enfin, nourrir le pipeline avec de nouvelles données entrantes implique démarrer l'exécution dudit pipeline. Le recours à un std::once_flag permet d'assurer que le code de démarrage du pipeline ne sera exécuté qu'une seule fois par programme. |
|
Ce qui suit détaille les éléments clés d'une implémentation d'un pipeline générique avec C++ 03. Je vous recommande de n'y avoir recours que si votre compilateur n'est pas à jour, car le tout est significativement plus fastidieux et plus lourd que la version C++ 17 proposée un peu plus haut.
Principal.cpp | |
---|---|
Débutons les explications par une présentation de ce qui sera le programme de test. Ceci nous permettra de voir comment il serait possible, en pratique, d'utiliser le pipeline avec des opérations quelconques. Les contraintes d'utilisation détaillées de cette implémentation du schéma de conception pipeline sont décrites plus bas (fichiers Pipeline.h et Pipeline.cpp). Pour le moment, nous limiterons nos explications à ceci :
|
|
Notre programme de test consistera en un pipeline de quatre étapes, opérant sur des chaînes de caractères standard (des std::string; j'écrirai simplement string ci-dessous par souci de simplicité). Notez que les types des opérations pourraient être plus variés, ceci n'était qu'un exemple. Les étapes de ce pipeline seront constituées d'un mélange de fonctions et de foncteurs unaires :
|
|
|
|
|
|
|
|
Le programme lui-même va comme suit :
Ici, notez que l'affichage final aurait avantageusement pu s'exprimer par
mais mon compilateur peine parfois avec les λ-expressions et plantait à la compilation, ce qui explique le recours à une répétitive for classique. |
|
Le résultat de l'exécution de ce programme de test sera :
emia'jemia'j
NOMNOM
!forp cihc!forp cihc
Le code client (le programme de test) ne peut se fier sur le fait que la file sortante du pipeline soit vide pour déterminer que le traitement a été complété, du fait que les diverses opérations du pipeline peuvent varier en termes de vitesse et sont faites de manière asynchrone. Un canal de sortie vide peut simplement signifier que certaines des opérations sont plus longues que prévu.
Notez aussi que rien n'empêche un pipeline de contenir des opérations de filtrage, de telle sorte que le nombre d'échantillons sortant pourrait être inférieur au nombre d'échantillons sortants. De même, rien n'empêche une opération du pipeline de consommer un échantillon et d'en produire plusieurs en sortie. Ainsi, le code client – responsable d'avoir construit le pipeline – doit prévoir des mécanismes pour évaluer si le pipeline a terminé son travail, du moins s'il est important pour lui de le savoir.
Ici, toutes les opérations prennent une chaîne en paramètre et retournent une chaîne, donc le nombre d'échantillons en entrée sera égal au nombre d'échantillons en sortie une fois le traitement terminé.
Le standard C++ 11 est une mise à jour importante du langage, mais malgré beaucoup de soin et d'attention, certains détails ont échappé aux membres du comité de standardisation.
L'un d'entre eux, tout simple, est qu'il existe un std::make_shared() pour instancier de manière sécuritaire et efficace un std::shared_ptr (pointeur intelligent avec sémantique de partage) mais qu'il n'existe pas de std::make_unique() pour faire l'équivalent avec un std::unique_ptr (pointeur intelligent avec sémantique de responsabilité exclusive).
Notez que std::unique_ptr<T> est covariant sur la base du type T. Il est en effet possible d'écrire ce qui suit :
template <class T, class D>
std::unique_ptr<T> creer()
{
return make_unique<D>();
}
si D est une dérivé de T. Cette caractéristique est importante dans notre code à l'intérieur de cet article.
Pour une explication de la pertinence d'une fonction telle que make_unique<T>(), voir cet article de Herb Sutter.
Vous remarquerez dans le code plus bas que j'utilise des std::shared_ptr mais que je ne me sers pas de std::make_shared<T>() pour les générer. La raison est que le recours à std::make_shared<T>() force, pour des raisons techniques, le type T à exposer un constructeur public, or il s'avère que pour les classes en fonction desquelles j'utilise des std::shared_ptr dans le présent article, j'ai préféré conserver les constructeurs privés, ou garder privées les classes à construire et à partager.
Sur ce site, des exemples de Mutex et verrous RAII sont disponibles, et offerts entre autres en version portable. Évidemment, C++ 11 offre de chic mécanismes de multiprogrammation, mais le compilateur que j'ai utilisé pour écrire le code présenté ici ne les supporte pas encore, donc les techniques « maison » ont dû être appliquées.
lockable.h | |
---|---|
Dans le but d'étendre un peu les techniques susmentionnées, j'ai défini un trait nommé lockable_traits, applicable à un outil de synchronisation (par exemple un Mutex). Il sera donc possible de déduire d'un type d'outil de synchronisation le type de verrou à lui appliquer. Évidemment, c'est une façon de faire, pas la façon. Procéder ainsi permet d'introduire de nouveaux outils de synchronisation dans un programme et de publier implicitement les mécanismes RAII pour s'en servir, mais cela réduit un peu la flexibilité du code client dans le cas où plusieurs variantes de synchronisation pourraient être rendues disponibles (verrou réentrant, verrou testable et autres). |
|
Un exemple concret de lockable_traits est donné dans Mutex.h, plus bas.
Avec C++ 11 vient une standardisation de traits sur les types, à travers la bibliothèque <type_traits>, ce qui remplace avantageusement plusieurs techniques classiques de métaprogrammation. Ces traits standards seront utilisés à quelques endroits dans le code présenté dans cet article.
unary_function_traits.h | |
---|---|
Dans le cas de l'implémentation d'un pipeline proposée ici, j'ai surtout besoin de savoir deux choses sur les opérations à appliquer, soit le type de paramètre attendu par une opération et le type de la valeur retournée par cette opération. En effet, un pipeline de opérations aura la forme suivante : ce qui, pris séquentiellement, ressemblerait à : où chaque est une zone de transit pour les échantillons et chaque est une transformation. Le code client détermine les , mais le pipeline doit valider que les intrants d'une opération peuvent être construits à partir des extrants de l'opération précédente. De même, le pipeline doit mettre en place les zones de transit pour que ceux-ci soient du type retourné par . Pour cette raison, les traits argument_of et result_of seront utilisés et permettront de raisonner sur les types impliqués. |
|
Le code d'argument_of déduit le type du paramètre d'une fonction unaire de sa signature et suppose qu'un foncteur unaire aura défini le type interne et public argument_type. Le code de result_of fait de même mais suppose plutôt d'un foncteur (unaire ou non) la présence d'un type interne et public result_type. Pour les fins du pipeline, seules les opérations unaires seront considérées.
J'utiliserai ici un Mutex portable au sens du code client (mais dont l'implémentation sous-jacente est spécifique à pour une plateforme donnée) exploitant sur les idiomes incopiable et pImpl. Bien qu'incopiable, le Mutex sera déplaçable, implémentant la sémantique de mouvement.
Mutex.h | |
---|---|
Vous pouvez vous référer la l'article sur les Mutex portables pour des détails quant aux bases de l'implémentation proposée ici. Notez que le verrou RAII pour les Mutex proposés ici est accessible à travers une spécialisation du trait lockable_traits applicable au type Mutex. J'ai fait un verrou simple ici, mais il aurait été facile d'ajouter plusieurs saveurs outre locker. Pour que la technique soit pertinente, il faut bien sûr que les lockable_traits définis pour d'autres types d'outils de synchronisation expriment leurs outils privilégiées sous le nom locker et sous la même sémantique d'utilisation que celle proposée ici. |
|
Notez que la qualification Incopiable aurait pu être omise ici du fait qu'au moins un des attributs d'un Mutex, le std::unique_ptr<Impl>, et lui-même incopiable (mais déplaçable).
J'ai construit mon implémentation de pipeline de manière à ce qu'un Pipeline donné ne puisse être démarré qu'une seule fois – il est toutefois possible de démarrer plusieurs pipelines, ce qui implique que redémarrer un pipeline donné puisse être simulé par la destruction du pipeline puis par la reconstruction d'un autre pipeline avec la même séquence d'opérations que celle prise en charge par le pipeline original.
Pourquoi ai-je écrit cette classe? Parce que mon pipeline exposait un grand nombre de constructeurs (pour des raisons que j'expliquerai plus bas) et que je ne voulais pas oublier, dans l'un ou l'autre d'entre eux, d'initialiser mon indicateur de pipeline démarré ou non correctement. En utilisant le type once_signal plutôt qu'un booléen, le problème potentiel s'est réglé de manière implicite.
L'idée d'une zone de transit pour assurer une communication symchronisée entre deux threads constitue une structure de données classique en multiprogrammation.
Nous arrivons maintenant à l'implémentation du pipeline en tant que telle. J'ai choisi d'y aller d'une classe concrète, Pipeline, qui dissimulera les validations de correspondance de types et la gestion des zones de transit « sous la couverture », de sorte que le code client en sera simplifié.
Pipeline.h | |
---|---|
Une instance de Pipeline sera essentiellement constituée d'une liste de tâches (des objets encapsulant chacune une transformation) et de canaux (des zones de transit). Le premier problème auquel nous ferons face sera celui des types impliqués. En effet, supposons un pipeline de la forme ; les canaux sont les alors que les transformations sont les . Sachant cela, il n'est pas clair que les types de , et soient les mêmes, tout comme il n'est pas clair que les types de et soient les mêmes. Pourtant, il nous faut les entreposer dans un même conteneur, et les conteneurs de C++ sont typés. Il faut donc placer dans tache_ des éléments de même type (le type Pipeline::Tache), tout comme il faut placer dans canal_ des éléments de même type (le type Pipeline::Canal). Nous verrons plus bas, en examinant les classes Tache et Canal, comment nous y arriverons. |
|
Les méthodes add_step() et add_channel() ont pour rôle d'aider à construire le pipeline. Dans le cas de add_step(), qui prend une opération du type générique F en paramètre, on parle d'ajouter une transformation à la liste de celles prises en charge par le pipeline. Ce paramètre est important du fait que si F est un pointeur de fonction, alors il faut savoir l'adresse de la fonction à exécuter, alors que si F est un foncteur, il est possible que celui-ci ait des états à tenir à jour. Dans le cas de add_channel(), aucun paramètre n'est requis mais le type T des éléments à placer dans la zone de transit doit être indiqué à l'appel. C'est le pipeline lui-même qui instanciera une zone_transit<T> vide lors d'un appel de cette méthode. Ces services sont qualifiés privés du fait que la cohérence entre les canaux et les tâches, en nombre ou en termes de types impliqués, est essentielle au bon fonctionnement du pipeline. |
|
Pour faciliter l'écriture du code, j'ai rédigé un service de classe générique validate_chaining(F0,F1) qui ne compilera que s'il est possible d'enchaîner une transformation de type F0 par une transformation de type F1. Ce raisonnement se base sur les types impliqués : il faut que la sortie de F0 puisse être convertie implicitement dans l'entrée de F1. |
|
Par exemple si F0 est string (*)(const char*) et si F1 est double (*)(string), alors validate_chaining(F0,F1) compilera car la sortie de F0, une string, peut être passée en entrée à F1.
En retour, si F0 est double (*)(string) alors que F1 est string (*)(const char*), alors validate_chaining(F0,F1) ne compilera pas, car la sortie de F0, un double, ne peut être implicitement convertie en l'entrée de F1, un const char*.
Maintenant, examinons le code par lequel se construit l'enchaînement de transformations, soit l'ensemble de méthodes build_chaining(). Faute d'avoir un compilateur supportant les templates variadiques, j'ai écrit des versions de cette méthode allant jusqu'à concurrence de dix transformations, en construisant un enchaînement de transformations à partir d'un enchaînement de transformations. Avec des templates variadiques, dont vous trouverez un exemple ici, ce code fondra comme neige au soleil. |
|
Tout comme les méthodes build_chaining() présentées plus haut, les constructeurs supportent jusqu'à dix transformations (mais on pourrait en ajouter plus au besoin). Évidemment, une solution basée sur les templates variadiques serait plus souple et plus élégante. Un constructeur de Pipeline met en place le canal d'entrée puis construit l'enchaînement des transformations. Suite à un constructeur de Pipeline de transformations, le nombre de canaux sera donc . |
|
Les méthodes entering() et exiting() sont typées sur la base du type de données dans la zone de transit entrante et sortante (respectivement) du pipeline. Leur implémentation est expliquée un peu plus bas. Évidemment, leur signature implique que le code client connaisse le type des données entrantes, et qu'il connaisse aussi le type résultant de la dernière transformation du pipeline. Ces méthodes n'ont aucun sens sur un Pipeline vide, mais cet état n'est pas possible à travers nos constructeurs. |
|
Alimenter le pipeline correspond à ajouter des éléments dans la zone de transit menant vers la première transformation (techniquement : ajouter à pour alimenter ). La classe Pipeline proposée ici offre deux services en ce sens, soit feed(elem) pour ajouter un seul élément et feed(debut,fin) pour ajouter d'un coup tous les éléments d'un intervalle à demi ouvert. Évidemment, la version opérant sur un intervalle est beaucoup plus rapide que celle n'opérant que sur un seul élément, les coûts associés à la synchronisation de la zone de transit se trouvant amortis sur l'ensemble des éléments ajoutés. |
|
Il aurait été plus rapide en général de séparer le démarrage (appel à la méthode start() et tests sur started_) de l'alimentation. J'ai choisi de joindre les deux dans le but d'alléger le code client, sans plus : un Pipeline alimenté au moins une fois sera implicitement démarré.
L'abstraction représentant une transformation dans une instance de Pipeline sera la classe interne Tache. Toutefois, Tache elle-même sera surtout une façade devant des transformations spécifiques. Remarquez la fonction globale amie prendre_en_charge(). C'est cette fonction qui, appelée par un thread de la plateforme, transformera une abstraction pure en une tâche et lui permettra de s'exécuter. La technique appliquée ici ressemble un peu à celle de l'idiome pImpl ou de la classe peu_importe; si vous souhaitez explorer un peu plus par vous-même, voilà des pistes pour vous aider à démarrer votre réflexion. |
|
Dans une Tache se trouvent deux niveaux de classes internes. Le premier niveau, Executable, représente les détails propres à la gestion d'un thread :
On y trouve aussi les canaux entrants et sortants (in_ et out_) pris sur une base abstraite (non-typée), et un service polymorphique executer() pour lancer à proprement dit l'exécution de cette partie du pipeline. |
|
Le deuxième niveau de classe interne, ExecutableImpl<F>, est sans doute le plus important. À tout le moins, c'est le plus actif, connaissant l'opération unaire à appliquer dans sa strate du pipeline (f_, de type F). Sa méthode executer() réalise le traitement d'une étape du pipeline à proprement dit :
|
|
Ne reste plus qu'à parler du type Pipeline::Tache en tant que tel. En fait, puisque ce type n'est pas générique (et ne peut l'être, servant de type général pour les tâches dans le pipeline), il ne peut pas seul traiter les divers cas possibles de transformations susceptibles de contribuer au pipeline dans son ensemble. Pour cette raison, il garde un pointeur sur un Executable, offre une interface simple pour le démarrer et l'exécuter, et permet de lui indiquer quels canaux (abstraits) utiliser. Le type Executable est polymorphique mais n'est pas générique; en retour, les enfants d'Executable, instances de ExecutableImpl<F> pour divers types F, sont génériques et connaissent précisément les types impliqués, ce qui leur permet de réaliser une prise en charge typée des opérations. |
|
L'approche mise en place pour Pipeline::Canal est similaire à cette appliquée dans le cas de Pipeline::Tache.
Reste à voir comment sont implémentés les quelques services restants de Pipeline. Certains, génériques, sont détaillés dans Pipeline.h, alors que les autres apparaissent dans Pipeline.cpp.
La méthode add_step(F) procède en quatre temps :
|
|
L'ajout d'un Pipeline::Canal, réalisé à travers la méthode add_channel(), est une opération typée mais sans paramètre. C'est au pipeline lui-même de créer les zones de transit, pas au code client. Ceci contraste avec les opérations (add_step(F), plus haut) qui, elles, sont la responsabilité du code client – le pipeline, lui, est responsable de leur organisation. |
|
Enfin, des services permettant au code client d'obtenir les zones de transit entrante et sortante du pipeline sont offertes. Notez que le code client doit au moins connaître les types des zones de transit demandées pour être en mesure de solliciter ces services correctement. |
|
Examinons maintenant les services associés à un Pipeline mais qui sont définis à même un fichiers source distinct.
Pipeline.cpp | |
---|---|
Dans le but de dissocier un peu plus la déclaration du Pipeline (dans Pipeline.h) d'une plateforme sous-jacente ou de l'autre, j'ai préféré déclarer une fonction générale, prendre_en_charge(void*), sans affiliation particulière avec une plateforme ou l'autre, amie de Tache plutôt que de faire de même avec une fonction dont la signature est associée de près avec la plateforme (execute_task(void*) dans ce cas-ci). C'est une petite touche, et ça ne coûte rien, mais c'est utile dans une optique d'entretien du code. Le recours à l'amitié ici est dû au fait qu'un accès à Executable est requis pour exécuter effectivement une étape du pipeline, or cette classe est privée dans Tache. |
|
Le destructeur de Pipeline est vide, mais nécessaire. La raison est subtile :
|
|
La méthode start() démarre chaque étape du pipeline, dans l'ordre. Notez qu'il y a condition de course ici dans la phase entre started_.test() et started_.set(). Je l'ai laissée là parce qu'elle ne dérangeait pas vraiment mon programme de test quelque peu banal, mais en pratique il faudrat s'en occuper. Comment règleriez-vous le problème? |
|
La classe Rep, interne à Tache::Executable (oui, on parle de quatre niveaux de classes imbriquées ici) sert de représentation simplifiée pour un thread de la plateforme sous-jacente. Ses responsabilités sont limitées : permettre d'attendre la fin du thread, entreposer sa représentation, démarrer un thread pour un Executable donné, et faire un peu de nettoyage. Normalement, j'aurais fait en sorte qu'une classe comme Rep s'assure, dans son destructeur, que le signal de fin soit envoyé au thread et qu'une attente de la fin effective de son exécution soit faite. Ici, par contre, du fait que je n'entrepose par un pointeur sur l'Executable associé à un thread dans Rep, j'ai choisi de laisser cette responsabilité à d'autres. |
|
Le constructeur par défaut d'Executable est défini ailleurs que le lieu de sa déclaration, pour les mêmes raisons que celles invoquées pour le destructeur de Pipeline, plus haut. Notez que rep_ n'a pas à être explicitement initialisé à nullptr du fait que le constructeur par défaut d'un pointeur intelligent fait déjà ce travail. |
|
Le destructeur d'Executable s'assure d'émettre au thread qui lui est associé un signal de fin et d'attendre la fin effective du thread avant de mourir lui-même (et d'entraîner la représentation rep_ avec lui). On ne veut pas de threads sauvages s'amusant avec les ressources du système, après tout. |
|
Enfin, le démarrage d'un Executable implique la construction d'une représentation (un Rep) et l'invocation de son service start() appliqué à l'Executable lui-même. |
|
En espérant que le tout vous soit utile...