Signaler un thread

Il arrive qu'un thread doive se mettre en attente de l'occurrence d'un événement, événement qui sera signalé par un autre thread. Dans un tel cas, en C++ (depuis C++ 11), la mécanique privilégiée est le recours à une condition_variable, comme dans l'exemple (simpliste) suivant :

#include <condition_variable>
#include <thread>
#include <iostream>
#include <future>
#include <thread>
#include <string>
#include <fstream>
#include <mutex>
using namespace std;
int main()
{
   mutex m;
   condition_variable cond;
   bool pret = {};
   vector<string> texte;
   thread fournisseur {
      [&]() {
         ifstream in{ "in.txt" };
         for (string s; getline(in, s); )
            texte.emplace_back(s);
         pret = true;
         cond.notify_one(); // le vecteur est rempli
      }
   },
   consommateur {
      [&]() {
         unique_lock<mutex> verrou{ m };
         cond.wait(verrou, [&] { return pret; });
         for (auto & s : texte)
            cout << s << endl;
      }
   };
   fournisseur.join();
   consommateur.join();
}

Cet exemple est trop simple pour utiliser de manière utile le signal passant par une condition_variable : il fait toute la lecture d'un côté, puis tout l'affichage de l'autre, ce qui fait de ce programme un programme multiprogrammé... mais où les threads s'exécutent en séquence plutôt qu'en parallèle.

Notez par contre qu'il pourrait être pertinent d'appliquer le même principe dans un cas où le fournisseur informerait le consommateur chaque fois que de nouvelles données deviennent disponibles.

Reprenons l'exemple plus haut mais dans une optique moins naïve, où le thread producteur rendra disponible une le texte d'une séquence de fichiers textes, un à la fois, et où le thread consommateur affichera le texte de chacun des fichiers, un à la fois lui aussi.

Nous utiliserons ici, pour signaler des événements entre nos deux threads :

  • Une condition_variable pour signaler la publication du texte d'un fichier par le producteur, et
  • Un atomic<bool> pour signaler l'exhaustion, chez le producteur, des données sources (signaler au consommateur qu'une fois les données déjà rendues disponibles consommées, il n'y en aura plus d'autres)
#include <condition_variable>
#include <thread>
#include <iostream>
#include <future>
#include <thread>
#include <string>
#include <fstream>
#include <mutex>
#include <atomic>
#include <deque>
using namespace std;

Pour le transfert des données du producteur au consommateur, nous utiliserons une zone de transit, mais adaptée pour que l'ajout d'éléments se fasse un fichier à la fois et qu'il en soit de même pour la consommation de fichiers.

Puisque nous ajouterons à la fin mais nous consommerons du début, le substrat de notre zone de transit sera un std::deque. Notez qu'un std::queue aurait aussi été convenable.

//...
class zone_transit
{
   deque<string> texte;
   mutex m;
public:
   void ajouter(const string &s)
   {
      lock_guard<mutex> _{ m };
      texte.emplace_back(s);
   }
   string extraire()
   {
      lock_guard<mutex> _{ m };
      if (texte.empty()) return {};
      auto s = texte.front();
      texte.pop_front();
      return s;
   }
};

Le programme principal mettra en interaction deux threads, soit un producteur et un consommateur. Le producteur lira le contenu d'une séquence de fichiers textes et les placera dans une zone de transit, alors que le consommateur extraira les données de chaque fichier de cette zone de transit pour les afficher à la sortie standard.

Les fichiers sont dans la liste fichiers, qui est implicitement un initializer_list<const char*>.

Ici, un booléen rendant disponible ou non des données ne suffira pas pour synchroniser les échanges entre le producteur et le consommateur. En effet, si le producteur est plus rapide que le consommateur, il deviendrait impossible pour le consommateur de faire la différence entre « j'ai le contenu de deux fichiers à afficher » et « j'ai le contenu d'un seul fichier à afficher »; en pratique, le producteur mettrait sans doute à true le booléen pour signaler la production d'une donnée et le consommateur remettrait à false ce booléen après consommation, ce qui pourrait (dans le meilleur des mondes) forcer le producteur à bloquer en attente d'un changement d'état du booléen en question. Cette approche est clairement contre-productive (et dangereuse).

Il est préférable de travailler avec deux entiers :

  • L'un, que nous nommerons produits, qui sera incrémenté par le producteur lorsque le contenu d'un nouveau fichier sera rendu disponible
  • L'autre, que nous nommerons consommes, qui sera incrémenté par le consommateur lorsque le contenu d'un fichier aura été consommé

En pratique, la relation consommes <= produits sera un invariant, et consommes == produits signifiera que le consommateur est à jour : toutes les données rendues disponibles jusqu'ici ont été consommées, et il est raisonnable de se mettre en attente de la condition_variable.

Le producteur signalera la condition_variable à chaque nouvelle production, et le fera à nouveau une fois le signal de fin provoqué, pour éviter que le consommateur ne reste bloqué en attente des dernières données produites.

//...
int main()
{
   auto fichiers = { "a.txt", "b.txt", "c.txt" };
   mutex m;
   condition_variable cond;
   int produits = {}, consommes = {};
   atomic<bool> fini{};
   zone_transit zt;
   thread fournisseur {
      [&]() {
         for (auto &fich : fichiers)
         {
            string texte; 
            ifstream in{ fich };
            for (string s; getline(in, s); )
               texte += s + "\n";
            zt.ajouter(texte);
            ++produits;
            cond.notify_one(); // un fichier
         }
         fini = true;
         cond.notify_one();
      }
   },
   consommateur {
      [&]() {
         while (!fini)
         {
            unique_lock<mutex> verrou{ m };
            cond.wait(verrou, [&] { return consommes < produits; });
            auto s = zt.extraire();
            cout << s << endl;
            ++consommes;
         }
         while (consommes < produits)
         {
            auto s = zt.extraire();
            cout << s << endl;
            ++consommes;
         }
      }
   };
   fournisseur.join();
   consommateur.join();
}

Signal unidirectionnel à usage unique

Scott Meyers, dans Effective Modern C++ et sur son propre blogue (voir http://scottmeyers.blogspot.ca/2013/12/threadraii-thread-suspension-trouble.html et http://scottmeyers.blogspot.ca/2015/04/more-on-threadraii-and-thread-suspension.html pour un peu de mise en contexte), fait valoir qu'une alternative pertinente pour les signaux faits une seule fois, de type Fire and Forget, est d'utiliser un future<void>. À titre d'illustration, voici une comparaison d'un programme où plusieurs threads attendent un signal pour démarrer leur travail le signal de démarrage est donné par une condition_variable ou par une future<void>, respectivement.

Avec condition_variable Avec future<void>
#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
#include <condition_variable>
using namespace std;
int main()
{
   bool go = {};
   mutex m;
   condition_variable cv;
   vector<thread> v;
   for (int i = 0; i < 10; ++i)
      v.emplace_back(
         thread{[&cv, &go, &m, i]() {
            unique_lock<mutex> verrou{m};
            cv.wait(verrou, [&]() { return go; });
            cout << "Youppi, thread " << i << '!' << endl;
         }
      });
   char c;
   cout << "Appuyez sur une touche pour démarrer... " << endl;
   cin >> c;
   go = true;
   cv.notify_all();
   for (auto & th : v) th.join();
}
#include <thread>
#include <future>
#include <iostream>
#include <vector>
using namespace std;
int main()
{
   promise<void> signal;
   auto fut = signal.get_future().share();
   vector<thread> v;
   for (int i = 0; i < 10; ++i)
      v.emplace_back(
         thread{[&, i]() {
            fut.wait();
            cout << "Youppi, thread " << i << '!' << endl;
         }
      });
   char c;
   cout << "Appuyez sur une touche pour démarrer... " << endl;
   cin >> c;
   signal.set_value();
   for (auto & th : v) th.join();
}

Le premier constat à faire est que, pour le même effet, la version avec future<void> est plus courte. Plus à propos, la plupart des artéfacts des condition_variable en sont absents (recours à un mutex, à un unique_lock, gestion du risque de réveil intempestif, etc.). La mécanique va comme suit :

Dans le cas d'un signal unidirectionnel et qui ne sera émis qu'une seule fois, donc, les future<void> sont une option intéressante.


Valid XHTML 1.0 Transitional

CSS Valide !