Échange de données synchronisé par une zone de transit

Le programme ci-dessous met en relief un exemple d'échange de données synchronisé entre deux threads à l'aide d'une zone de transit synchronisée par voie de mutex.

Programme de test

La liste des en-têtes auxquels nous aurons recours ici est indicative du fait que nous utiliserons les outils de C++ 11. En effet, tous nos en-têtes sont standards, et nous serons en mesure d'accomplir notre tâche en totalité sans avoir recours à des outils propres à une plateforme ou l'autre.

D'ailleurs, cette tâche sera, de manière concurrente :

  • de consommer un fichier texte, une ligne à la fois;
  • d'écrire les mêmes données dans un autre fichier, par blocs de taille variable selon les circonstances;
  • le tout en passant par une zone de transit sur laquelle les accès sont synchronisés.

Cette synchronisation a un coût, mais ce coût est probablement moins lourd que vous ne l'auriez cru au préalable. Qu'il soit acceptable ou non variera selon les problèmes et les plateformes.

#include <iostream>
#include <fstream>
#include <string>
#include <atomic>
#include <future>
#include <mutex>
#include <vector>
#include <algorithm>
#include <utility>
#include <chrono>
#include <iterator>
using namespace std;
using namespace std::chrono;

La zone de transit que nous utiliserons sera générique sur la base du type des valeurs qui y transiteront.

Dans notre cas, puisque nous consommerons du texte, le type sera char

template <class>
   class zone_transit;

Il peut sembler tentant de faire transiter des string par la zone de transit, du fait que le texte original sera consommé une ligne à la fois, mais mieux vaut résister : extraire une ligne à la fois serait une contrainte abusive pour le consommateur, et utiliser une string comme substrat pour les données de la zone de transit imposerait un coût déraisonnable pour cette structure de données.

Prenez soin de distinguer ce qui importe pour chaque thread, et de réfléchir aux conséquences de vos choix architecturaux.

Le thread qui remplira la zone de transit sera représenté par la fonction lecteur(), alors que le thread qui videra cette zone sera représenté par la fonction scripteur().

Remarquez les paramètres qualifiés volatile, en particulier le booléen fini qui, de plus, est qualifié atomic. Nous y reviendrons.

void lecteur(
   const string &nom, volatile zone_transit<char> &dest, volatile atomic<bool> &fini
);
void scripteur(
   volatile zone_transit<char> &src, const string &nom, volatile atomic<bool> &fini
);

La fonction append_newline() est dispendieuse avec C++ 03 mais l'est beaucoup moins avec C++ 11, dû à la sémantique de mouvement, qui s'applique ici à la variable anonyme résultant de la concaténation de s avec "\n".

string append_newline(const string &s)
   { return s + string{"\n"}; }

La définition d'une zone de transit reposera sur deux états, soit :

  • un vecteur de T, nommé ici data, contenant les données (FIFO) en cours de transit; et
  • un mutex, nommé ici mut, permettant de synchroniser les accès sur data.

L'implémentation proposée ici repose sur les outils de C++ 11, et expose une interface volatile vouée aux situations multiprogrammées de même qu'une interface non-volatile axée sur la vitesse. Comme c'est la coutume, notre interface volatile procédera en deux temps, soit assurer la synchronisation des opérations par un mutex et déléguer le travail à proprement dit vers un service de l'interface non-volatile.

La chose à ne pas faire est d'offrir un service permettant d'ajouter un seul T à la fois : le coût de la synchronisation serait alors trop gros en proportion du coût du traitement qu'il protège., Il est toujours possible d'ajouter un objet à la fois (p. ex. : zt.ajouter(&obj, &obj+1);) mais il ne faut pas encourager des comportements inefficaces en les rendant faciles d'accès.

Observez la méthode verrouiller(). Elle est volatile, encapsule les transtypages requis pour capturer le mutex, et retourne un unique_lock possédant le mutex en question à qui souhaite l'utiliser à travers un mouvement (une copie étant bien sûr illégale ici). Ceci permet d'alléger le code client (interne à la classe), et ce en toute sécurité puisque le destructeur du unique_lock en question libérera le mutex quoiqu'il advienne.

Portez aussi attention à la méthode extraire(), qui s'exécute en temps constant (complexité ) et pourrait être qualifiée noexcept dans la mesure où le constructeur par défaut d'un vector<T> et son constructeur de mouvement sont tous deux de complexité constante et noexcept.

template <class T>
   class zone_transit
   {
      mutex mut;
      vector<T> data;
      unique_lock<mutex> verrouiller() volatile
      {
         unique_lock<mutex> lock{const_cast<mutex&>(mut)};
         return std::move(lock);
      }
   public:
      template <class C>
         void ajouter(const C &src)
            { ajouter(begin(src), end(src)); }
      template <class Itt>
         void ajouter(Itt debut, Itt fin)
            { data.insert(end(data), debut, fin); }
      vector<T> extraire()
      {
         using std::swap;
         vector<T> resultat;
         swap(resultat, data);
         return std::move(resultat);
      }
      template <class C>
         C extraire() volatile
         {
            vector<T> res = extraire();
            return C(begin(res), end(res));
         }
      template <class C>
         void ajouter(const C &src) volatile
         {
            auto lock = verrouiller();
            const_cast<zone_transit<T>&>(*this).ajouter(src);
         }
      template <class Itt>
         void ajouter(Itt debut, Itt fin) volatile
         {
            auto lock = verrouiller();
            const_cast<zone_transit<T>&>(*this).ajouter(debut, fin);
         }
      vector<T> extraire() volatile
      {
         auto lock = verrouiller();
         return const_cast<zone_transit<T>&>(*this).extraire();
      }
   };

Reste maintenant à examiner le code de test, et à voir comment les threads qui se partageront la zone de transit sont construits. Nous examinerons d'abord le code de test en tant que tel, qui se déclinera en trois temps :

La fonction de test minuter(f,ntests) accepte une opération nullaire f de type F et l'appelle ntests fois, cumulant la somme des temps écoulés et retournant cette somme. C'est une approche simpliste et éminemment perfectible, mais elle suffira pour nos fins.

Si vous souhaitez raffiner le tout, envisagez entre autres :

  • une capture RAII du temps écoulé; et
  • une horloge choisie par le code client.
template <class F>
   system_clock::duration minuter(F fct, int ntests)
   {
      system_clock::duration total {};
      for(int i = 0; i < ntests; ++i)
      {
         auto avant = system_clock::now();
         fct();
         total += system_clock::now() - avant;
      }
      return total;
   }

La fonction nb_lignes() consomme tous les bytes d'un fichier nommé fich à l'aide d'un istreambuf_iterator et compte le nombre d'occurrences de bytes représentant un saut de ligne. Il est présumé ici que fich soit un fichier texte, bien que ce fichier soit ouvert en mode binaire pour que tous les bytes y soient traités équitablement.

std::streamsize nb_lignes(const string &fich)
{
   return count(
      istreambuf_iterator<char>{ifstream{fich, std::ios::binary}},
      istreambuf_iterator<char>{}, '\n'
   ) + 1;
}

Le programme principal lance les tests en procédant comme suit :

  • il calcule le nombre de lignes du fichiers source à traiter;
  • il lance la fonction minuter() en lui passant une λ qui, elle, lance deux threads se partageant une zone de transit, et attendant que les deux aient complété leur travail; et
  • il affiche le temps écoulé pour la somme de tous ces tests.

À noter :

  • la zone de transit est qualifiée volatile, pour faire en sorte que les services synchronisés soient ceux utilisés par ses clients;
  • un booléen atomique est utilisé pour signaler la fin de l'exécution du thread alimentant la zone de transit, et par conséquent la fin du thread qui videra cette dernière;
  • il est essentiel de passer par une atomique ici du fait que le moment où l'un des threads modifiera son état constitue un signal passé d'un thread à l'autre, et ainsi qu'il faut empêcher le compilateur et le processeur de déplacer cette instruction en cherchant à optimiser l'exécution du programme – une telle optimisation briserait alors la cohérence séquentielle du code; enfin,
  • par défaut, la fonction async() prendrait ses paramètres par valeur. Pour que zt et fini soient relayés par référence au code des threads, nous utilisons le type std::ref<T> qui se convertira en T& à l'usage. Il existe aussi un std::cref<T> qui, lui, se convertirait en const T&.

 

 int main()
{
   const char *in = "data.txt",
              *out = "data.out";
   const auto nlignes = nb_lignes(in);
   enum { ntests = 10 };
   auto ecoule = minuter([&]() -> void {
      volatile atomic<bool> fini = false;
      volatile zone_transit<char> zt;
      auto lect = async(lecteur, string(in), std::ref(zt), std::ref(fini));
      auto scrip = async(scripteur, std::ref(zt), string(out), std::ref(fini));
      scrip.wait();
      lect.wait();
   }, ntests);
   cout << "Temps ecoule: " << duration_cast<milliseconds>(ecoule).count()
        << " ms. pour " << ntests
        << " tests sur un fichier de " << nlignes << " lignes\n\tmoyenne: "
        << static_cast<double>(duration_cast<milliseconds>(ecoule).count())/ ntests
        << " ms. par test" << endl;
}

Les threads de ce programme sont très simples.

Le thread lecteur(nom,dest,fini) ouvre le fichier nommé nom, en consomme chaque ligne et l'ajoute (suffixée d'un saut de ligne, du fait que getline() consomme du texte jusqu'à un saut de ligne sans conserver ce dernier) à la zone de transit dest.

Une fois le fichier complètement consommé, le thread modifie l'état de fini pour signaler au thread scripteur() qu'il ne l'alimentera plus en données désormais.

void lecteur(
   const string &nom, volatile zone_transit<char> &dest, volatile atomic<bool> &fini
)
{
   ifstream in{nom;
   for (string ligne; getline(in, ligne); )
      dest.ajouter(append_newline(ligne));
   fini = true;
}

Enfin, le thread scripteur(src,nom,fini) consomme le texte de la zone de transit src sous forme d'une string (on aurait aussi pu utiliser des vector<char> ici), consommant chaque fois la totalité des données qui s'y trouvent, et ajoute ce texte au fichier nommé nom.

Une fois le signal fini constaté, ce thread consomme une dernière fois les données de src au cas où un dernier remplissage lui aurait échappé, ce qui est tout à fait possible (pour ne poas dire probable) ici.

void scripteur(
   volatile zone_transit<char> &src, const string &nom, volatile atomic<bool> &fini
 )
{
   ofstream out{nom};
   while (!fini)
   {
      auto texte = src.extraire<string>();
      if (!texte.empty())
         out << texte;
   }
   // restants possibles
   auto texte = src.extraire<string>();
   if (!texte.empty())
      out << texte;
}

Un résultat possible de l'exécution de ce programme de test serait :

Temps ecoule: 900 ms. pour 10 tests sur un fichier de 100001 lignes
        moyenne: 90 ms. par test

Valid XHTML 1.0 Transitional

CSS Valide !