Ce document montre deux versions d'un programme utilisant des futures pour réaliser du code en parallèle :
Ce qui suit détaille un exemple complet et opérationnel de programme implémentant un calcul séquentiel (la comptabilité du nombre de mots dans un fichier, appliqué à plusieurs fichiers; probablement le problème canonique dans le genre) et son implémentation parallèle, entre autres dans une optique de cibler intelligemment le moment le plus opportun pour passer d'une stratégie à l'autre. Nous explorerons au passage le schéma de conception Regroupement de threads (Thread Pool) et la métaphore de la programmation par promesses, à l'aide de futures.
Le problème que nous atttaquerons sera un classique dans le genre, c'est-à-dire compter les mots dans un fichier. C'est un problème qui, pour un ensemble de fichiers donné, se résout bien séquentiellement (on ouvre un fichier, on compte les mots qui y apparaissent, on le ferme, on passe au suivant), mais qui est tel qu'il est fortement parallélisable (le traitement de chaque fichier est indépendant de celui des autres; seul les moments où sont colligés les résultats est critique).
L'exemple proposé est complet et comprend plusieurs modules. Dans certains cas, les concepts proposés sont détaillés ailleurs sur le site de votre chic prof, alors soyez tolérant(e)s si vous rencontrez des renvois ici et là. J'ai essayé de présenter les modules en ordre croissant de dépendances, pour que les trucs les plus indépendants (les moins fortement couplés) apparaissent d'abord. J'espère que cela vous conviendra. Entre autres, nous aurons recours à l'idiome d'objets incopiables, à des mutex portables, des sections critiques portables et à des événements portables (que je laisse en exercice). Nous utiliserons aussi des assertions statiques, une technique de métaprogrammation bien connue.
Le code en illustration sera découpé en tronçons :
Vous pourrez ainsi naviguer rapidement les sections qui sont moins essentielles au propos, à moins que vous n'ayez un intérêt pour les manoeuvres qui s'y trouvent (et qui sont, en soi, parfois assez amusantes et assez instructives). En escamotant des sections, évidemment, soyez conscient(e)s qu'il est possible que vous deviez faire un acte de foi ici et là en lien avec la technique escamotée au passage.
Vous remarquerez ici et là des commentaires indiquant des raffinement à apporter. Si vous avez envie de vous amuser...
Nous mettrons en place un système capable de comptabiliser le nombre de mots total apparaissant dans un ensemble de fichiers. Il est possible de comptabiliser le même fichier plusieurs fois si le code client le souhaite (filtrer la liste des noms de fichiers pour en éliminer la redondance n'est pas le propos de cet exercice), et on considère un mot comme étant une séquence de symboles séparés par des blancs, ce qui permettra de consommer des mots aussi simplement qu'on consommerait des std::string sur un std::istream à l'aide de l'opérateur >>.
Pour ce faire, nous utiliserons entre autres des futures. Une future est un objet représentant la promesse d'un résultat, qui peut être testé de manière non-bloquante pour savoir si le traitement qui lui est associé a été complété, et qui peut être invoqué de manière bloquante pour en extirper le résultat. Un exemple simplifié d'utilisation d'une future serait celui exposé à droite. Vous remarquerez sans peine un découplage fort entre les détails techniques de la plateforme sous-jacente, et une grande simplicité d'utilisation. Il faut cependant travailler assez fort pour en arriver à cela, comme vous le constaterez. La construction d'une future fait en sorte de traiter concurremment la tâche qui lui est associée (ce qui peut être fait en lançant un thread, en demandant à un tiers sur un autre ordinateur de faire le travail, en ajoutant une tâche dans une liste à traiter par un regroupement de threads comme dans cet exemple, etc.). Invoquer l'opérateur () (traiter la future comme une fonction) bloque, s'il y a lieu, jusqu'à obtention du résultat, puis retourne ce résultat. Tester la complétion de la future (ici, la traiter comme un booléen) est une tâche non-bloquante, ce qui permet au code client de vaquer à d'autres occupations. |
|
Une future plus complète que celle qui est proposée ici permettrait aussi d'annuler une tâche et de copier la future à loisir. Je vous invite à raffiner le tout si vous le souhaitez.
Nous utiliserons aussi un regroupement de threads (en anglais : Thread Pool). Plutôt que de lancer des threads à la pièce, nous en construirons un certain nombre au préalable, et nous leur donnerons tous une seule et même responsabilité : piger une tâche parmi celles qui sont en attente de traitement, la prendre en charge, puis recommencer. Ces threads seront des ouvriers, ou Worker Threads.
Ceci permettra l'implémentation de futures de manière quelque peu simplifiée, et limitera le coût du démarrage des threads à celui encouru au moment initial (mis à part les moments où le système sera tellement sollicité par des tâches en cours qu'il devra accroître le nombre d'ouvriers à l'oeuvre. Les regroupements de threads foisonnent sur les plateformes contemporaines; vous pouvez remplacer celui proposé ici par celui de votre plateforme de prédilection dans la mesure où vous aurez pris soin d'adapter le code quelque peu.
Reste à voir comment, concrètement, nous compterons les mots de manière séquentielle (le code montrant le traitement en version parallèle a été placé dans la section sur les futures, un peu plus bas).
Une tautologie est une expression logique qui est vraie peu importe les conditions initiales. Pour des raisons techniques sur lesquelles nous reviendrons plus bas, nous utiliserons le foncteur générique whatever qui évalue toujours à vrai. Depuis C++ 14, il est possible d'écrire plus simplement ceci :
|
|
Compter les mots en série est simple : prendre chacun des fichiers à traiter, y compter les mots et retourner le cumul obtenu. Cette approche fonctionne très bien en pratique, et peut être réinvestie par la version parallèle quand le seuil choisi pour passer de parallèle à séquentiel a été atteint. J'ai séparé le traitement en deux : compter les mots dans un seul document, puis compter les mots dans plusieurs documents (en appelant plusieurs fois la fonction comptant les mots dans un seul document). |
|
Le programme principal réalisera
un ensemble de tests et exposera, dans un fichier de sortie, le fruit des calculs
réalisés. Pour l'invoquer, il suffira de lancer le programme avec
une liste de noms de fichiers, et de le laisser travailler. Les fruits de ce
programme permettront entre autres de trouver un bon seuil de transition séquentiel/
parallèle pour un système donné.
Pour l'essentiel, donc, ce code lance des tâches de traitement séquentiel ou parallèle, tire des métriques, et entrepose le tout dans un fichier de sortie pour consultation ultérieure, à tête reposée. Ce programme fait la démonstration qu'il est possible d'utiliser le regroupement de threads et les futures, mais sert surtout à déterminer les conditions idéales utilisation de ces outils dans un contexte appliqué.
#include "statistiques.h"
#include "compter_mots.h"
#include "compter_mots_seriel.h"
#include "compter_mots_map_reduce.h"
#include "minuterie.h"
#include <sstream>
#include <iostream>
#include <iterator>
#include <algorithm>
#include <string>
#include <sstream>
using namespace std;
const char *terminaison(bool pluriel)
{
static const char * TERMINAISONS [] = { "", "s" };
return TERMINAISONS[pluriel? 1 : 0];
}
const char *config()
{
#ifdef _DEBUG
return "(DEBUG)";
#else
return "(RELEASE)";
#endif
}
template <class It>
double tester_seriel(It debut, It fin, ostream &sortie, int n_essai)
{
stringstream sstr;
sortie << "Approche sérielle, essai " << n_essai << endl;
int n_seriel;
double ecoule = 0.0;
{
minuterie minu{sstr, &ecoule};
n_seriel = compter_mots_multidocuments_seriel(debut, fin);
}
sortie << '\t' << n_seriel << " mots au total" << endl;
sortie << '\t' << sstr.str() << endl;
return ecoule;
}
template <class It>
double tester_parallele(It debut, It fin, ostream &sortie, int n_essai, int seuil = CompterMots<It>::SEUIL_DEFAUT)
{
stringstream sstr;
sortie << "Approche parallele, essai " << n_essai << ", seuil sequentiel " << seuil << endl;
int n_map_reduce;
double ecoule = 0.0;
{
minuterie minu{sstr, &ecoule};
n_map_reduce = compter_mots_multidocuments_map_reduce(debut, fin, seuil);
}
sortie << '\t' << n_map_reduce << " mots au total" << endl;
sortie << '\t' << sstr.str() << endl;
return ecoule;
}
template <class It>
void production_statistiques(const std::string &titre, It debut, It fin, std::ostream &os)
{
os << "Version " << titre << ":\n"
<< "*\tmeilleur resultat: " << *min_element(debut, fin) << " ms\n"
<< "*\tpire resultat: " << *max_element(debut, fin) << " ms\n"
<< "*\ttemps moyen: " << moyenne(debut, fin) << " ms\n"
<< "*\tecart type: " << ecart_type(debut, fin) << " ms\n"
<< endl;
}
int main(int argc, char *argv[])
{
ofstream sortie{"out.txt"};
const int NDOCS = argc - 1;
enum { NTESTS = 10 };
sortie << "Compter les mots, " << NDOCS
<< " document" << terminaison(argc > 2)
<< ", " << NTESTS << " test" << terminaison(NTESTS > 2)
<< ' ' << config() << "\n\t";
copy (argv + 1, argv + argc, ostream_iterator<char*>{sortie, "\n\t"});
sortie << endl;
enum { NSEUILS = 20 };
double resultats_sequentiel[NTESTS] = { 0.0 };
double resultats_parallele[NTESTS] = { 0.0 };
double resultats_parallele_seuil[NSEUILS][NTESTS] = { {0.0} };
for (int i = 0; i < NTESTS; ++i)
{
resultats_sequentiel[i] = tester_seriel(argv + 1, argv + argc, sortie, i + 1);
resultats_parallele[i] = tester_parallele(argv + 1, argv + argc, sortie, i + 1);
for (int j = 0; j < NSEUILS; ++j)
resultats_parallele_seuil[j][i] = tester_parallele(argv + 1, argv + argc, sortie, i + 1, j + 1);
sortie << endl;
cout << '.' << flush;
}
cout << endl;
//
// Production de statistiques simples
//
production_statistiques("sequentielle", resultats_sequentiel, resultats_sequentiel + NTESTS, sortie);
production_statistiques("parallele", resultats_parallele, resultats_parallele + NTESTS, sortie);
for (int i = 0; i < NSEUILS; ++i)
production_statistiques("parallele avec seuil sequentiel " + to_string(i+1),
resultats_parallele_seuil[i],
resultats_parallele_seuil[i] + NTESTS,
sortie);
double moyennes[NSEUILS];
for (int i = 0; i < NSEUILS; ++i)
moyennes[i] = moyenne(begin(resultats_parallele_seuil[i]), end(resultats_parallele_seuil[i]));
sortie << "\n\nMeilleur temps moyen: ";
double *p = min_element(begin(moyennes), end(moyennes));
sortie << *p << " ms. avec un seuil sequentiel de " << static_cast<int>(p - moyennes + 1) << endl;
thread_pool_::get().log_stats(sortie);
}
Tel que mentionné plus haut, une future encapsule la promesse du résultat d'un calcul. Les futures que nous utiliserons ici seront couplées avec l'implémentation du regroupement de threads dont nous discuterons plus bas, mais c'est plus un choix d'implémentation de la part de votre humble serviteur qu'une condition technique ou conceptuelle à proprement dit.
Les extensions par traits à la bibliothèque <functional> ont été couverts plus tôt, alors je ne répéterai pas les explications ici. Disons seulement que, pour que les futures puissent capturer la valeur retournée par l'opération qu'elles prennent en charge, il leur faudra connaître le type dans lequel cette valeur sera exprimée. Les traits d'opérations nullaires serviront précisément à cela. Notez qu'une partie de l'encapsulation permise la l'implémentation de futures proposée ici repose sur des délégués, qui constituent un outil plus lourd dans notre arsenal conceptuel. Si vous souhaitez en savoir plus sur le sujet, il vous faudra travailler un peu plus fort. Les outils de cette section sont dépréciés, s'harmonisant mal avec la surcharge de fonctions, mais je n'ai pas encore eu le temps de réécrire le tout en fonction des pratiques contemporaines. |
|
Le Thread Pool lancera plusieurs threads ouvriers (ce qu'on nomme typiquement des Worker Threads), qui prendront en charge des tâches, représentées par la classe Tache visible à droite. Une Tache possèdera un identifiant entier (pour faciliter le débogage). Elle pourra être prise en charge et exécutée (méthode abstraite faire()), testée de manière non bloquante pour vérifier si elle a été complétée, et permettra qu'on se mette en attente bloquante de la fin de son exécution. D'autres services (attente de la complétion jusqu'à concurrence d'un certain délai maximal) pourraient être ajoutés à cette classe si vous le jugez nécessaire. |
|
Pour des raisons techniques, nous définissons la variable représentant l'identifiant de Tache le plus récemment attribué dans un fichier source. En C++, il est possible de définir les constantes de classe entières dans un fichier d'en-tête, mais les constantes de classe qui ne sont pas entière et les attributs de classe non-constants doivent être définis dans une unité de traduction qui assurera leur unicité (donc, typiquement, dans un fichier source, puisque personne ne les inclura). |
|
Les threads ouvriers seront capables de prendre une Tache en charge, de l'exécuter et de s'en débarrasser. Ce sont ces ouvriers qui assureront la dynamique du système. Là où chaque Tache représentera ce qui doit être fait, chaque Worker représentera l'action de faire cette chose. La méthode clé ici est prendre_en_charge(), qui assure la prise en charge d'une tâche par un ouvrier. Elle est décrite plus bas. Notez que, bien qu'un Worker ait pour rôle de manipuler des instances de Tache, l'interface entière de la classe Worker n'a aucunement besoin de connaître le détail de la classe Tache ou la taille des instances de cette classe (dans l'interface de Worker, le seul endroit où le nom Tache est utilisé, il l'est à travers un pointeur). Pour cette raison, seul le nom de la classe Tache est introduit ici (instruction struct Tache;), limitant ainsi le couplage entre les fichiers. |
|
Le code de prise en charge d'une Tache par un ouvrier est dissimulé dans un fichier source pour fins de réduction de couplage. Sans grande surprise, quand un ouvrier prend en charge une Tache, il lui dit de se faire, et c'est tout. |
|
Pour faciliter la jonction entre les instances de future et les instances de Tache, une classe générique TacheHolder sera utilisée. Tache est une racine polymorphique, ce qui permet de placer des pointeurs sur toutes les tâches dans un seul et même conteneur et d'exprimer le regroupement de threads sur la base de tâches au sens large, mais lorsqu'il vient le temps d'implémenter des tâches spécifiques, nous instancierons un TacheHolder<F> pour chaque tâche de type F. L'abstraction des détails techniques de F passera par un délégué nullaire, ce qui est d'ailleurs un excellent rôle pour des délégués. Cette classe dépose l'opération à faire dans un délégué, et implémente le service Tache::faire() à l'aide d'une invocation du délégué en question. Ce faisant, l'opération encapsulée par une Tache donnée pourra être une fonction nullaire ou un foncteur nullaire. Le TacheHolder déduira le type de donnée retourné par la tâche qu'encapsulera le délégué, et saura retenir une copie de cette donnée dans un attribut d'instance, permettant ainsi le découplage temporel entre le travail fait par la future et les besoins du code client. On trouve des idiomes analogues dans d'autres types de données exigeant à la fois une interface non-générique (pensez à Tache) mais une implémentation générique (pensez à TacheHolder). Pour un exemple, voir la technique de l'effacement de types. |
|
Le code parallèle pour compter des mots dans des fichiers repose sur le code séquentiel accomplissant la même tâche. La ligne clé est : cumul_ = fA() + fB(); qui, sans trop que cela ne paraisse, cumule le fruit de deux calculs réalisés en parallèle. Le seuil par défaut utilisé ici pour passer de séquentiel à parallèle est complètement arbitraire. Un bon seuil serait déterminé de manière empirique. La fonction creer_compter_mots() est une fonction de fabrication qui allège la construction des objets qui seront encapsulés par les TacheHolder* cachés derrière les futures en déduisant les types de ses paramètres. Le seuil à partir duquel l'ensemble des fichiers à traiter est suffisamment petit pour procéder de manière séquentielle peut être choisi à la construction d'une instance de CompterMots, ce qui facilite la rédaction des tests de performance (voir plus bas). |
|
Le regroupement de threads sera un singleton, mais c'est plus une question de simplicité que de nécessité car il serait possible d'envisager un système à plusieurs regroupements de threads distincts. Une partie importante du code que vous trouverez dans cette section tire des statistiques d'utilisation; le code du regroupement en soi est plus léger qu'il n'y paraît.
Ici, N est la taille initiale du regroupement. J'utilisais à l'origine des tableaux de taille fixe pour h_ et pour worker_, mais en pratique des approches comme Map/ Reduce consomment beaucoup de threads qui sont mis en attente du fruit du calcul d'autres threads, et on se retrouve avec une croissance rapide de threads dormants. Conséquemment, j'ai choisi d'y aller avec un compromis, soit le vecteur qui offre des temps d'accès et de parcours comparables (pratiquement identiques) à ceux d'un tableau brut mais qui permettent d'adapter leur capacité en fonction des moments. Ici, notez que vous pourriez avoir plusieurs instances de thread_pool dans la mesure où la valeur attribuée à N serait chaque fois différente. Je ne sais pas à quel point ce serait utile, mais l'implémentation actuelle le permet. Un thread_pool comprendra, en gros, une file de tâches à faire (attribut taches_, synchronisé par l'attribut mutex_) et une liste d'ouvriers (et de leurs threads), représentée par deux conteneurs (h_ pour les HANDLE et worker_ pour les ouvriers eux-mêmes). Le regroupement est synchronisé pour deux raisons distinctes, ce qui explique la présence de deux mutex :
N'oubliez pas que la classe Mutex représente le concept de synchronisation par accès exclusif à une ressource. Ici, il est probable qu'une implémentation sous-jacente locale au processus (une section critique plutôt qu'un outil du système d'exploitation) soit avantageuse. Dans le cas où le système est plutôt plein (voir la méthode du même nom), le regroupement pourra redimensionner la liste des ouvriers pour éviter un phénomène d'attrition des forces de travail. Remarquez que la méthode qui procède à cette redimension est privée, ce qui explique qu'elle puisse supposer que son client, étant interne à la classe, se soit comporté correctement et ait au préalable synchronisé les accès sur les structures de données appropriées. Remarquez aussi que la politique de redimensionnement appliquée ici est d'ajouter un nombre fixe d'ouvriers; ce n'est pas la seule approche possible. La décision d'ajouter ou non des ouvriers se fait suite à la saisie d'une nouvelle tâche dans le lot des tâches disponibles. Si le système est saturé, alors le regroupement de threads recrutera de nouveaux employés, en quelque sorte. Le recours à un regroupement de 32 threads par défaut est absolument arbitraire. Pour valider la valeur intiale idéale, il faudrait procéder à des tests plus poussés, ce que je n'ai pas fait. |
|
Nous utilisons plusieurs petits utilitaires (souvent des foncteurs) dans le code présenté ici.
Convertir un booléen en entier (et inversement) dès la compilation est chose somme toute simple. Certaines validations dans le code implémentant les délégués reposent sur la capacité de réaliser agréablement ces conversions. |
|
Nous aurons parfois recours à un type qui représentera l'absence de types (le type Vide). Nous le décrirons de la manière visible à droite. Ici encore, être capable de représenter l'absence de type par un type nous permettra d'exprimer correctement des délégués sur un nombre variable (aux yeux du code client) de paramètres. |
|
L'implémentation de délégués que nous utiliserons demande de choisir, à l'occasion, un type parmi plusieurs. Le static_switch visible à droite joue ce rôle. Le schéma proposé ici pourrait être étendu facilement à un plus grand nombre de types, au besoin, dans la mesure où vous seriez disposé(e) à ajouter le code requis, mais c'est un peu fastidieux... |
|
Définir des délégués (concept fréquemment rencontré en C# et en Delphi) est possible en C++, Expliquer le code demanderait beaucoup de temps, mais ça ne veut pas dire que le code soit vraiment difficile à rédiger... L'essentiel du travail ici a été fait par Patrick Boutot, diplômé du DDJV à l'Université de Sherbrooke. Je l'en remercie! Quand j'aurai le temps, je retoucherai le tout pour profiter des mécanismes de C++ 11, en particulier des templates variadiques. |
|
Voilà.
Le code qui précède est lourd mais fonctionne. Cependant, depuis C++ 14, le tout devient nettement plus simple. Je ne reprendrai pas ici toutes les explications proposées plus haut, me limitant à expliquer les simplifications, disparitions, ajouts et autres ajustements.
Notez que j'ai évacué le volet « statistiques » pour cette version, dans le but de ne pas obscurcir le propos (si vous souhaitez le réintégrer, il est sensiblement comme celui des exemples précédents). J'ai strictement gardé le volet décrivant le temps écoulé en fonction de la charge de travail et des seuils de séparations des tâches.
La comptabilité sérielle du nombre de mots dans un fichier demeurera telle quelle, à ceci près que l'amusant type whatever devient redondant maintenant que nous avons des expressions λ génériques. |
|
Le volet Map/ Reduce est simplifié du fait que nous avons recours à std:future et à std::async(). |
|
J'ai écrit une version simplifiée de l'algorithme moyenne() mais acceptant une fonction de transformation des données cumulées. La raison pour ce changement est que dans cette version, le code de test (plus bas) cumulera le temps non pas sur des double, mais bien sur des high_resolution_clock::duration, ce qui impliquera éventuellement d'aller chercher le nombre de tics cumulés (méthode count()) pour réaliser la division qu'implique ce calcul. Dans les cas où aucune transformation du cumul n'est requise, le foncteur noop sera utilisé en lieu et place de cette transformation. |
#i |
Le programme de test suit. Remarquez d'office à quel point l'échafaudage a été simplifié dans cet exemple plus contemporain (vous pouvez comparer avec l'exemple précédent pour faire le constat vous-mêmes). La fonction terminaison(), décorative, peut maintenant être résolue à la compilation, et il en va de même pour la fonction config(). |
|
J'ai écrit une fonction générique de test, retournant une paire faire du résultat d'un calcul et du temps écoulé pour l'effectuer. Ceci remplace avantageusement la classe de minuterie des exemples précédents. |
|
Les tests sériel et parallèle n'ont que peu changé, outre le fait qu'ils sont plus simples dû au mécanisme de test dont nous venons de discuter. Les expressions λ sont un outil merveilleux pour simplifier l'écriture des programmes. |
|
J'ai modifié le programme en soi sur quelques aspects :
|
|
Voilà : une solution nettement plus simple, plus compréhensible, et probablement plus efficace.