Échange de données synchronisé par un -tuple tampon

Le programme ci-dessous met en relief un exemple d'échange de données synchronisé entre deux threads à l'aide d'une zone de transit sans synchronisation. Nous représenterons ici cette zone de transit par un double tampon (ou -uplet, spécialisation d'un tampon -uplet) tel que pendant qu'un thread remplira l'un des tampons, l'autre tampon sera vidé par l'autre thread.

Ce schème quelque peu périlleux peut fonctionner si nous prenons soin d'offrir quelques garanties de cohérence séquentielle, sous la forme de variables atomiques, et si notre thread vidant les tampons (ici, le thread scripteur()) est toujours plus rapide que le thread remplissant les tampons (ici, le thread lecteur()). Des ralentissements occasionnels peuvent être compensés par l'ajout de tampons ou par l'accroissement de la taille des tampons (ces deux techniques sont équivalentes pour nos fins), du moins si le pire cas de ralentissement possible est connu a priori.

Une versions synchronisée, plus lente mais moins risquée, du code proposé sur cette page peut être examinée ici.

Programme de test

La liste des en-têtes auxquels nous aurons recours ici est indicative du fait que nous utiliserons les outils de C++ 11. En effet, tous nos en-têtes sont standards, et nous serons en mesure d'accomplir notre tâche en totalité sans avoir recours à des outils propres à une plateforme ou l'autre.

D'ailleurs, cette tâche sera, de manière concurrente :

  • De consommer un fichier texte, une ligne à la fois
  • D'écrire les mêmes données dans un autre fichier, par blocs de taille variable selon les circonstances
  • Le tout en passant par une zone de transit sur laquelle les accès ne sont pas synchronisés

Procéder sans synchronisation accélère l'exécution, mais introduit des pièges et rend l'écriture beaucoup plus subtile. Dans la mesure de possible, préférez un équivalent synchronisé au code présenté ici.

#include <iostream>
#include <fstream>
#include <string>
#include <string_view>
#include <atomic>
#include <future>
#include <vector>
#include <algorithm>
#include <utility>
#include <chrono>
#include <iterator>
#include <array>
using namespace std;
using namespace std::chrono;

La zone de transit que nous utiliserons sera générique sur la base :

  • Du type des valeurs qui y transiteront
  • Du nombre de tampons, et
  • De la capacité, exprimée en nombre d'éléments, des tampons

Dans notre cas, puisque nous consommerons du texte, le type sera char. La classe ntuple_buffer<T,N,M> sera le cas général d'un tampon -uplet dont les éléments sont de type T et dont chaque tampon a une capacité de M éléments, mais en pratique notre code se limitera à des double_buffer<T,M>, équivalents directs du type ntuple_buffer<T,2,M>.

Notez que j'ai choisi de fixer la capaciuté de chaque tampon à bufsize (4096 pour le moment), mais ce choix est purement arbitraire.

template <class, int, int>
   class ntuple_buffer;
template <class T, int N>
   using double_buffer = ntuple_buffer<T, 2, N>;
enum { bufsize = 4096 };

Le thread qui remplira la zone de transit sera représenté par la fonction lecteur(), alors que le thread qui videra cette zone sera représenté par la fonction scripteur().

Remarquez les paramètres qualifiés volatile, en particulier le booléen fini qui, de plus, est qualifié atomic. Nous y reviendrons.

void lecteur(
   string_view nom,
   volatile double_buffer<char, bufsize> &dest,
   atomic<bool> &fini
);
void scripteur(
   volatile double_buffer<char, bufsize> &src,
   string_view nom,
   atomic<bool> &fini
);

La fonction append_newline() est dispendieuse avec C++ 03 mais l'est beaucoup moins avec C++ 11, dû à la sémantique de mouvement, qui s'applique ici à la variable anonyme résultant de la concaténation de s avec "\n".

string append_newline(const string &s) {
   return s + "\n";
}

La définition d'un tampon -uplet reposera sur les états suivants :

  • Un tableau de tableaux de T, nommé ici bufs_, contenant les données en cours de transit
  • Deux compteurs, read_buf et write_buf, indiquant respectivement l'indice du tampon en cours de vidage et l'indice du tampon en cours de remplissage, et
  • Un compteur, cur_write, indiquant la position où se produira la prochaine écriture dans le tampon à l'indice write_buf

J'aurais pu utiliser des tableaux bruts, mais une instance de la classe array offre les mêmes seuils de performance, occupe le même espace en mémoire, et offre des services supplémentaires.

L'implémentation proposée ici repose sur les outils de C++ 11, et expose une interface volatile vouée aux situations multiprogrammées de même qu'une interface non-volatile axée sur la vitesse. Dans ce cas, en l'absence de tout mécanisme de synchronisation, les deux interfaces joueront le même rôle (on pourrait dire que la marque volatile est décorative, sans plus).

La chose à ne pas faire est d'offrir un service permettant d'ajouter un seul T à la fois : le coût de la synchronisation serait alors trop gros en proportion du coût du traitement qu'il protège., Il est toujours possible d'ajouter un objet à la fois (p. ex. : zt.ajouter(&obj, &obj+1);) mais il ne faut pas encourager des comportements inefficaces en les rendant faciles d'accès.

La clé de cette classe est le fait que write_buf est atomique. Pour cette raison, le compilateur et le processeur ne sont pas autorisés à déplacer les moments où les accès en lecture et en écriture à cette donnée sont faits dans le programme. Le caractère inamovible des accès à cette donnée s'avère essentiel, du fait que modifier sa valeur implique signaler à un autre thread du droit d'accès.

template <class T, int NBUFS, int SZBUF>
   class ntuple_buffer {
      using this_type = ntuple_buffer<T, NBUFS, SZBUF>;
      array<array<T,SZBUF>,NBUFS> bufs;
   public:
      using bufndx_t = typename
         array<array<T,SZBUF>,NBUFS>::size_type;
      usinh ndx_t = typename
         array<T,SZBUF>::size_type;
      static bufndx_t next_buffer(bufndx_t n) {
         return n == NBUFS - 1? 0 : n + 1;
      }
      static bufndx_t prev_buffer(bufndx_t n) {
         return n? n - 1 : NBUFS - 1;
      }
   private:
      atomic<bufndx_t> read_buf{0}, write_buf{0};
      atomic<ndx_t> cur_write{0};
   public:
      ntuple_buffer() = default;
      template <class C>
         void ajouter(const C &src) {
            ajouter(begin(src), end(src));
         }
      template <class It>
         void ajouter(It debut, It fin) {
            while (debut != fin) {
               auto n = distance(debut, fin);
               auto m = bufs_[write_buf].size() - cur_write;
               auto ntoadd = min<decltype(n)>(m,n);
               copy(debut, debut + ntoadd, begin(bufs[write_buf]) + cur_write);
               cur_write += ntoadd;
               advance(debut, ntoadd);
               if (cur_write == bufs[write_buf].size()) {
                  write_buf = next_buffer(write_buf);
                  cur_write = {};
               }
            }
         }
      vector<T> extraire() {
         vector<T> resultat;
         while(read_buf != write_buf) {// ICI: ambitieux
            resultat.insert(end(resultat), begin(bufs[read_buf]), end(bufs[read_buf]));
            read_buf = next_buffer(read_buf);
         }
         return resultat;
      }
      vector<T> extraire_tout() {
         auto resultat = extraire();
         resultat.insert(end(resultat), begin(bufs[write_buf]), begin(bufs[write_buf]) + cur_write);
         return resultat;
      }
      template <class C>
         C extraire() volatile {
            auto res = extraire();
            return { begin(res), end(res) };
         }
      template <class C>
         C extraire_tout() volatile {
            auto res = extraire_tout();
            return { begin(res), end(res) };
         }
      template <class C>
         void ajouter(const C &src) volatile {
            const_cast<this_type&>(*this).ajouter(src);
         }
      template <class It>
         void ajouter(It debut, It fin) volatile {
            const_cast<this_type&>(*this).ajouter(debut, fin);
         }
      vector<T> extraire() volatile {
         return const_cast<this_type&>(*this).extraire();
      }
      vector<T> extraire_tout() volatile {
         return const_cast<this_type&>(*this).extraire_tout();
      }
   };

Reste maintenant à examiner le code de test, et à voir comment les threads qui se partageront la zone de transit sont construits. Nous examinerons d'abord le code de test en tant que tel, qui se déclinera en trois temps :

La fonction de test minuter(f,ntests) accepte une opération nullaire f de type F et l'appelle ntests fois, cumulant la somme des temps écoulés et retournant cette somme. C'est une approche simpliste et éminemment perfectible, mais elle suffira pour nos fins.

template <class F>
   auto minuter(F fct) {
      auto avant = high_resolution_clock::now();
      fct();
      return high_resolution_clock::now() - avant;
   }
template <class F>
   auto minuter(F fct, int ntests) {
      high_resolution_clock::duration total {};
      for(int i = 0; i < ntests; ++i)
         total += minuter(fct);
      return total;
   }

La fonction nb_lignes() consomme tous les bytes d'un fichier nommé fich à l'aide d'un istreambuf_iterator et compte le nombre d'occurrences de bytes représentant un saut de ligne. Il est présumé ici que fich soit un fichier texte, bien que ce fichier soit ouvert en mode binaire pour que tous les bytes y soient traités équitablement.

std::streamsize nb_lignes(const string &fich) {
   ifstream in{ fich, ios::binary };
   return count(
      istreambuf_iterator<char>{ in },
      istreambuf_iterator<char>{},
      '\n'
   ) + 1;
}

Le programme principal lance les tests en procédant comme suit :

  • Il calcule le nombre de lignes du fichiers source à traiter
  • Il lance la fonction minuter() en lui passant une λ qui, elle, lance deux threads se partageant une zone de transit, et attendant que les deux aient complété leur travail, et
  • Il affiche le temps écoulé pour la somme de tous ces tests

À noter :

  • La zone de transit est qualifiée volatile, pour faire en sorte que les services synchronisés soient ceux utilisés par ses clients
  • Un booléen atomique est utilisé pour signaler la fin de l'exécution du thread alimentant la zone de transit, et par conséquent la fin du thread qui videra cette dernière
  • Il est essentiel de passer par une atomique ici du fait que le moment où l'un des threads modifiera son état constitue un signal passé d'un thread à l'autre, et ainsi qu'il faut empêcher le compilateur et le processeur de déplacer cette instruction en cherchant à optimiser l'exécution du programme – une telle optimisation briserait alors la cohérence séquentielle du code
  • Enfin, par défaut, la fonction async() prendrait ses paramètres par valeur. Pour que zt et fini soient relayés par référence au code des threads, nous utilisons le type std::ref<T> qui se convertira en T& à l'usage. Il existe aussi un std::cref<T> qui, lui, se convertirait en const T&

Les threads de ce programme sont très simples :

  • Le thread lecteur, nommé lect, ouvre le fichier nommé nom, en consomme chaque ligne et l'ajoute (suffixée d'un saut de ligne, du fait que getline() consomme du texte jusqu'à un saut de ligne sans conserver ce dernier) et l'ajout au double tampon dest. Une fois le fichier complètement consommé, lect modifie l'état de fini pour signaler au thread scripteur, nommé script, qu'il ne l'alimentera plus en données désormais
  • Ce dernier (script) consomme le texte du double tampon src sous forme d'une string (on aurait aussi pu utiliser des vector<char> ici), consommant chaque fois la totalité des données qui s'y trouvent, et ajoute ce texte au fichier nommé nom. Une fois le signal fini constaté, ce thread consomme une dernière fois les données de src au cas où un dernier remplissage lui aurait échappé, ce qui est tout à fait possible (pour ne poas dire probable) ici. Notez le passage par extraire_tout(), du fait que la logique de la méthode extraire() fait en sorte d'éviter que la lecture et l'écriture ne se fassent sur le même tampon – ici, il importe de rejoindre le point d'écriture avec le point de lecture, ce qui est correct du fait que, par définition, l'écriture est alors terminée.
int main() {
   const char *in = "data.txt",
              *out = "data.out";
   const auto nlignes = nb_lignes(in);
   enum { ntests = 10 };
   auto ecoule = minuter([&]() -> void {
      volatile atomic<bool> fini = false;
      volatile double_buffer<char, bufsize> zt;
      auto lect = async([nom = in, &dest = zt, &fini] {
         ifstream in{ nom };
         for (string ligne; getline(in, ligne); )
            dest.ajouter(append_newline(ligne));
         fini = true;
      });
      auto scrip = async([nom = out, &src = zt, &fini]() {
         ofstream out{ nom };
         while (!fini) {
            auto texte = src.extraire<string>();
            if (!texte.empty())
               out << texte;
            this_thread::sleep_for(1ms);
         }
         // restants possibles
         auto texte = src.extraire_tout<string>();
         if (!texte.empty())
            out << texte;
      });
      scrip.wait();
      lect.wait();
   }, ntests);
   cout << "Temps ecoule: " << duration_cast<milliseconds>(ecoule).count() << " ms. pour "
        << ntests << " tests sur un fichier de " << nlignes << " lignes\n\tmoyenne: "
        << static_cast<double>(duration_cast<milliseconds>(ecoule).count())/ ntests << " ms. par test" << endl;
}

Un résultat possible de l'exécution de ce programme de test serait :

Temps ecoule: 327 ms. pour 10 tests sur un fichier de 100001 lignes
        moyenne: 32.7 ms. par test

En espérant que le tout vous soit utile...


Valid XHTML 1.0 Transitional

CSS Valide !