Ce texte présume chez vous une familiarité avec les algorithmes standards, les conteneurs standards et le langage C++, en particulier le standard C++ 11. Comprendre les expressions λ vous aidera à suivre certains des raisonnements ci-dessous.
Notez que, bien que le code proposé ci-dessous soit fonctionnel et correct, il ne se veut pas de qualité industrielle et n'a pas été testé rigoureusement sur plusieurs plateformes. Si vous avez accès à de véritables implémentations commerciales d'algorithmes tels que ceux décrits ici, privilégiez-les.
L'exemple canonique d'approche Map/ Reduce est le calcul en parallèle du nombre de mots dans un ensemble de fichiers. Ce qui suit montre une implémentation de ce calcul, avec comparatifs pour la vitesse d'exécution séquentielle et pour la vitesse d'exécution parallèle, en utilisant divers seuils en termes de nombres de fichiers à traiter pour diviser le traitement.
Il est possible, en général, sur une même machine, de se rapprocher de l'optimalité en divisant de prime abord le traitement en fonction du nombre d'unités de traitement matérielles disponibles (en C++ 11, on parle de se baser sur std::thread::hardware_concurrency, qui retourne un entier non-signé représentant le nombre d'unités de traitement matérielles disponibles), surtout pour des tâches qui sont CPU-Bound. Dans une approche Map/ Reduce, il est d'usage de diviser le traitement en deux et de répartir les calculs sur deux ordinateurs distincts, et c'est cette démarche qui est adoptée dans l'exemple ci-dessous, bien qu'elle ne soit pas optimale pour une implémentation multiprogramée sur un seul ordinateur.
Le traitement séquentiel s'exprimera comme suit :
Les éléments en retrait s'expriment algorithmiquement comme une composition de fonctions. En notation UML :
En C++, une implémentation possible de lire_fichier() serait celle proposée à droite. Pour la comprendre, ce qu'il faut savoir :
Cette technique est très simple et très rapide. |
|
En C++, une implémentation possible de compter_mots() serait celle proposée à droite. Pour la comprendre, ce qu'il faut savoir :
Conséquemment, compter_mots() compte, tout simplement, tous les mots du flux sstr qui, lui, contient le texte de la string nommée s. Son type de retour est le type interne et public string::size_type, utilisé pour représenter le nombre d'éléments dans une string. |
|
Dans le programme de test, plus bas, vous verrez l'expression λ suivante, qui réalise concrètement la composition de ces deux fonctions :
// ...
auto fct = [](const string &nomfich) {
return compter_mots(lire_fichier(nomfich));
};
// ...
Pour en savoir plus sur la composition de fonctions, voir ceci. C'est cette opération qui sera exécutée par la version séquentielle comme par la version parallèle de notre algorithme, et ce sur chacun des fichiers à traiter.
Le traitement séquentiel en tant que tel s'exprimera comme suit :
// ...
template <class It, class F, class T>
T traiter_sequentiel(It debut, It fin, F fct, T init) {
return accumulate(debut, fin, init, [&](T cur, auto && val) {
return cur + fct(val);
});
}
La séquence standard (à demi-ouverte) de noms de fichiers à traiter est déterminée par debut et fin. La fonction à appliquer sur chaque fichier est fct, et la valeur initiale du cumul est init. Il suffit d'examiner l'expression λ un peu plus haut pour constater que, puisque fct(fich) retourne le nombre mots dans un fichier nommé fich donné, alors traiter_sequentiel(debut, fin, fct, 0) retournera la somme du nombre de mots dans tous les fichiers dont le nom apparaît dans l'intervalle .
Le programme de test que nous utiliserons ici sera :
// ...
#include <vector>
#include <string>
#include <chrono>
#include <iostream>
using namespace std;
using namespace std::chrono;
template <class F, class ... Args>
auto tester(F && f, Args &&... args) {
auto avant = system_clock::now();
auto res = f(std::forward<Args>(args)...);
auto apres = system_clock::now();
return make_pair(res, apres - avant);
}
int main(int argc, char *argv[]) {
auto fichiers = (argc == 1)? vector<string>(8000, "in.txt") : vector<string>(argv + 1, argv + argc);
auto fct = [](const string &nomfich) {
return compter_mots(lire_fichier(nomfich));
};
cout << fichiers.size() << " fichiers a traiter" << endl;
cout << endl;
cout << "Nombre de mots au total (calcul sequentiel) : ";
auto [r0, dt0] = tester(traiter_sequentiel, begin(fichiers), end(fichiers), fct, 0);
cout << r0 << "\n\tevalue en " << duration_cast<milliseconds>(dt0).count() << " ms." << endl;
for (int seuil = 256; seuil <= 4096; seuil *= 2) {
cout << "Nombre de mots au total (calcul parallele), seuil " << seuil << " : ";
auto [r1, dt1] = traiter_parallele(traiter_sequentiel, begin(fichiers), end(fichiers), fct, 0, seuil);
cout << r1 << "\n\tevalue en " << duration_cast<milliseconds>(dt1).count() << " ms." << endl;
}
}
Évidemment, la valeur 8000 et le contenu du fichier "in.txt" sont arbitraires : les deux ont été choisis (ou générés) dans le but de faire travailler le programme. Vous remarquerez qu'outre le seuil à partir duquel le calcul parallèle doit cesser de se subdiviser, les deux calculs se présentent de la même manière : capturer le moment présent avant et après le calcul, puis afficher le temps écoulé exprimé en millisecondes. Pour le calcul parallèle, divers seuils de subdivision sont utilisés. L'idée va comme suit :
Nous montrerons le temps d'exécution total, incluant celui de l'exécution séquentielle, pour trois implémentations distinctes de traitement_parallele() ci-dessous.
Une implémentation de traitement_parallele() basée sur des instances de std::thread prises en charge manuellement serait :
struct traiter_parallele {
template <class It, class F, class T>
T operator()(It debut, It fin, F fct, T init, int seuil) {
auto n = distance(debut, fin);
if (n <= seuil)
return traiter_sequentiel(debut, fin, fct, init);
auto p = next(debut, n / 2);
T r0; // resultat de th
thread th {
[&r0,fct,init,seuil](It debut, It fin) {
r0 = traiter_parallele{}(debut, fin, fct, init, seuil);
}, debut, p
};
auto r1 = traiter_parallele{}(p, fin, fct, init, seuil);
th.join();
return r0 + r1;
}
};
Pas à pas, cette implémentation procède comme suit :
À l'exécution du programme de test avec cette implémentation, j'obtiens :
8000 fichiers a traiter
Nombre de mots au total (calcul sequentiel) : 97992000
evalue en 28735 ms.
Nombre de mots au total (calcul parallele), seuil 256 : 97992000
evalue en 44784 ms.
Nombre de mots au total (calcul parallele), seuil 512 : 97992000
evalue en 45048 ms.
Nombre de mots au total (calcul parallele), seuil 1024 : 97992000
evalue en 44485 ms.
Nombre de mots au total (calcul parallele), seuil 2048 : 97992000
evalue en 31018 ms.
Nombre de mots au total (calcul parallele), seuil 4096 : 97992000
evalue en 22731 ms.
Appuyez sur une touche pour continuer...
Lorsque le seuil est petit, trop de threads sont lancés pour mon ordinateur, et ces threads se battent pour avoir accès au processeur. Conséquemment, la gestion des threads nuit à leur saine exécution. Ceci se voit du fait que les trois premiers seuils testés mènent à des résultats parallèles nettement moins bons que ceux d'une exécution purement séquentielle. Sur mon ordinateur à huit coeurs, cette implémentation amène rapidement tous les coeurs à travailler à .
La situation s'améliore un peu quand le seuil atteint 2048, puis le programme parallèle commence à donner de meilleurs résultats (de manière significative) que le code séquentiel une fois le seuil de 4096 atteint.
Cette implémentation fonctionne bien, mais a deux gros défaut : elle est un peu trop « bas niveau », nous forçant à utiliser une variable temporaire r0 passée par référence pour entreposer la valeur du calcul fait dans th, et elle ne gère pas les exceptions correctement si celles-ci sont levées dans le thread.
Une implémentation de traitement_parallele() basée sur un recours un peu naïf à std::async serait :
struct traiter_parallele {
template <class It, class F, class T>
T operator()(It debut, It fin, F fct, T init, int seuil) {
auto n = distance(debut, fin);
if (n <= seuil)
return traiter_sequentiel(debut, fin, fct, init);
auto p = next(debut, n / 2);
auto f0 = async(launch::async, traiter_parallele{}, debut, p, fct, init, seuil);
return f0.get() + traiter_parallele{}(p, fin, fct, init, seuil);
}
};
Pas à pas, cette implémentation procède comme suit :
À l'exécution du programme de test avec cette implémentation, j'obtiens :
8000 fichiers a traiter
Nombre de mots au total (calcul sequentiel) : 97992000
evalue en 27784 ms.
Nombre de mots au total (calcul parallele), seuil 256 : 97992000
evalue en 28206 ms.
Nombre de mots au total (calcul parallele), seuil 512 : 97992000
evalue en 28154 ms.
Nombre de mots au total (calcul parallele), seuil 1024 : 97992000
evalue en 28273 ms.
Nombre de mots au total (calcul parallele), seuil 2048 : 97992000
evalue en 28115 ms.
Nombre de mots au total (calcul parallele), seuil 4096 : 97992000
evalue en 28131 ms.
Appuyez sur une touche pour continuer...
Manifestement, cette implémentation a des vertus (elle est plus simple que la précédente), mais elle ne fonctionne pas, les résultats des exécutions « parallèles » s'apparentant aux résultats des exécutions « séquentielles » (avec un peu plus de temps consacré à la gestion de la mécanique). On le constate d'ailleurs en examinant le taux d'occupation des différents coeurs de notre ordinateur : un seul coeur est sollicité ici en pratique.
La raison est la suivante :
Cette situation un peu déplaisante s'avère, que nous ayons utilisé la politique de lancement par défaut pour notre async(), qui est de laisser le moteur choisir pour nous de réaliser un traitmenent parallèle ou séquentiel, ou que nous ayons, comme c'est le cas ici, explicitement demandé un lancement asynchrone.
Une implémentation de traitement_parallele() basée sur un recours correctement implémenté à std::async serait :
struct traiter_parallele {
template <class It, class F, class T>
T operator()(It debut, It fin, F fct, T init, int seuil) {
auto n = distance(debut, fin);
if (n <= seuil)
return traiter_sequentiel(debut, fin, fct, init);
auto p = debut;
advance(p, n / 2);
auto f0 = async(launch::async, traiter_parallele{}, debut, p, fct, init, seuil);
auto f1 = async(launch::async, traiter_parallele{}, p, fin, fct, init, seuil);
return f0.get() + f1.get();
}
};
La grosse différence avec la version trop naïve proposée plus haut est qu'ici, traiter_parallele() lance deux async() distinctes et se suspend (appel bloquant au get() des deux futures) en attendant que ceux-ci aient conclu leur tâche. Cette version occupe rapidement à pleine capacité tous les coeurs d'un ordinateur.
À l'exécution du programme de test avec cette implémentation, j'obtiens :
8000 fichiers a traiter
Nombre de mots au total (calcul sequentiel) : 97992000
evalue en 28476 ms.
Nombre de mots au total (calcul parallele), seuil 256 : 97992000
evalue en 45734 ms.
Nombre de mots au total (calcul parallele), seuil 512 : 97992000
evalue en 43602 ms.
Nombre de mots au total (calcul parallele), seuil 1024 : 97992000
evalue en 44239 ms.
Nombre de mots au total (calcul parallele), seuil 2048 : 97992000
evalue en 30915 ms.
Nombre de mots au total (calcul parallele), seuil 4096 : 97992000
evalue en 23122 ms.
Appuyez sur une touche pour continuer...
L'exécution de la version parallèle avec un seuil de 4096 s'exécute en du temps de la version séquentielle, un gain de de temps d'exécution. Visiblement, l'effort rapporte.