Tâches morcelables

Dans un STR où une tâche doit démarrer à intervalles réguliers ou constants (une tâche périodique), il est typique qu'il reste un temps rédisuel après la complétion de l'exécution de cette tâche et avant le démarrage de son prochain cycle. Par exemple, si une tâche doit s'exécuter à un rythme de , il importe que son exécution se complète dans un temps tel que . La part résiduelle du cycle est donc .

Il est commun qu'un fil d'exécution se suspende pour le temps résiduel , or ce temps résiduel peut souvent être utilisé à des fins plus pertinentes, par exemple pour exécuter des tâches auxiliaires susceptibles d'améliorer la fluidité ultérieure du programme.

C'est dans cet esprit que se présente la posibilité de morceler une tâche.

Exemple concret, pas à pas

Supposons un algorithme standard tel que std::transform(). Une manière de l'exprimer serait :

template <class IIt, class OIt, class F> // IIt : input iterator; OIt : output iterator; F : fonction
   void transform(IIt ds, IIt fs, OIt dd, F f) {
      for(; ds != fs; ++ds, (void) ++dd)
         *dd = f(*ds);
   }

Cet algorithme a quelques préconditions : la paire forme un intervalle légal en entrée; f est applicable à *ds et f(*ds) peut être affecté à *dd; il y a au moins std::distance(ds,fs) éléments dans lesquels il est possible d'écrire à partir de dd; etc. Présumant que s'exécute dans un temps dont le pire cas est connu a priori, la complexité algorithmique de std::transform(ds,fs,dd,f) est , donc , donc . Clairement, sur cette base, le temps d'exécution de la fonction dépend linérairement du nombre d'éléments dans la séquence en entrée. Pour cette raison, appeler cette fonction dans la part résiduelle d'un cycle peut faire en sorte que l'algorithme appelant ne rencontre pas ses contraintes de temps (pour une fonction f donnée) si la séquence à traiter est trop longue.

Une version morcelable du même algorithme aura les propriétés suivantes :

Cela signifie qu'une version morcelable d'un algorithme comme std::transform() pourrait s'exprimer comme suit :

template <class IIt, class OIt, class F /* ... */>
   std::pair<IIt, OIt> transform_morcelable(IIt ds, IIt fs, OIt dd, F f, /* ... */) {
      for(; ds != fs && /* ... */; ++ds, (void) ++dd)
         *dd = f(*ds);
      return { ds, dd };
   }

... alors qu'un code client naïf opérant sur un vector<int> et produisant un vector<X> pour un certain type X pourrait s'exprimer comme suit (ici, nous simplifions le problème en présumant que suite au traitement, v.size()==res.size() s'avèrera) :

// ...
vector<int> v = // ...
// ...
vector<X> res;
res.reserve(v.size());
for(auto p = begin(v); !fini;) {
   // ... traitement TR essentiel
   // ... évaluer le temps résiduel disponible
   auto [ds,dd] = transform_morcelable(p, end(v), back_inserter(res), /* ... */);
   p = ds;
}

Ce code est naïf du fait qu'il ne permet pas de passer à un autre bloc de données à traiter une fois la première incarnation de v pleinement traitée, mais donne un aperçu de ce qu'il faudrait faire en pratique.

Choisir une modalité d'interruption

Plusieurs modalités d'interruption sont possibles. L'une d'elles se base sur le nombre d'itérations à exécuter, estimé dans ce cas par le code client :

template <class IIt, class OIt, class F>
   auto transform_morcelable(IIt ds, IIt fs, OIt dd, F f, int nit) {
      for(int n = 0; ds != fs && n < nit; ++ds, (void) ++dd, ++n)
         *dd = f(*ds);
      return std::pair{ ds, dd };
   }

Une autre se base sur le dépassement d'une échéance fixée par le code client (qui devrait tenir compte du coût estimé d'une itération de l'algorithme pour éviter de déborder du temps alloué pour son propre cycle d'exécution). Dans l'exemple ci-dessous, cette échéance est représentée par un délai dt suivant l'appel de la fonction :

template <class IIt, class OIt, class F, class T>
   auto transform_morcelable(IIt ds, IIt fs, OIt dd, F f, T dt) {
      auto t0 = system_clock::now();
      for(auto tf = t0 + dt; ds != fs && t0 < tf; ++ds, (void) ++dd, t0 = system_clock::now())
         *dd = f(*ds);
      return std::pair{ ds, dd };
   }

L'approche la plus simple est cependant de suppléer à la fonction appelée un prédicat de poursuite choisi par le code client. Ceci offre une souplesse maximale dans l'implémentation et découple la fonction morcelée des besoins réels du code client :

template <class IIt, class OIt, class F, class Pred>
   auto transform_morcelable(IIt ds, IIt fs, OIt dd, F f, Pred pred) {
      for(; ds != fs && pred(); ++ds, (void) ++dd)
         *dd = f(*ds);
      return std::pair{ ds, dd };
   }

À titre d'exemple, dans cette dernière approche, il est possible de modéliser les deux approches précédentes, plus spécifiques, par des foncteurs :

Approche comptabilisant les itérations Approche validant le non-dépassement d'une échéance
class CompterIterations {
   mutable int n = 0;
   int seuil;
public:
   CompterIterations(int seuil) : seuil{ seuil } {
   }
   bool operator()() const {
      return ++n < seuil;
   }
};
class RepecterEcheance {
   system_clock::time_point base = system_clock::now();
   system_clock::time_point echeance;
public:
   RespecterEcheance(system_clock::duration dt) {
      echeance = base + dt;
   }
   bool operator()() const {
      return system_clock::now() < echeance;
   }
};
Code client Code client
// ...
vector<int> v = // ...
// ...
vector<X> res;
res.reserve(v.size());
for(auto p = begin(v); !fini;) {
   // ... traitement TR essentiel
   auto rt = transform_morcelable(p, end(v), back_inserter(res); CompterIterations{ 10 }); // 10 itérations max
   p = re.first;
}
// ...
vector<int> v = // ...
// ...
vector<X> res;
res.reserve(v.size());
for(auto p = begin(v); !fini;) {
   // ... traitement TR essentiel
   auto rt = transform_morcelable(p, end(v), back_inserter(res); RespecterEcheancs{ 10ms }); // 10ms max
   p = re.first;
}

Voilà! Notez que ce sont des foncteurs munis d'états (Stateful), et qu'en particulier dans le cas du « compteur d'itérations », chaque appel à operator() comptabilise une itération, que ce soit le cas ou non : si on algorithme morcelable devait appeler plus d'une fois un tel foncteur dans une même itération, les résultats pourraient être faussés.

Morcelabilité, continuations et coroutines

Les continuations, les coroutines et les tâches morcelables sont des concepts extrêmement proches les uns des autres. Une continuation est un enchaînement de tâches asynchrones (la tâche est suivie de la tâche dans une séquence de tâches ), ce qui permet de réduire le blocage et d'accroître le parallélisme; la morcelabilité est la segmentation d'un traitement en sous-tâches, avec maintien des états intermédiaires du traitement en cours; une coroutine implémente l'idée-même de fonction résumable.

Examinons brièvement quelques manières d'atteindre le même résultat avec diverses approches (nous n'utiliserons pas de continuations dans cet exemple, le comportement associé divergeant quelque peu de notre objectif). Tous les exemples qui suivent auront le même comportement, soit afficher des entiers entre 0 et 9 inclusivement (parfois tous ces entiers, parfois certains d'entre eux seulement) avec un léger délai (approximativement 100 ms) entre chaque affichage.

Morcelabilité par état explicite

Une option est de représenter les états transitoires du calcul par une classe, donc de manière explicite.

Cet exemple (comme presque tous les autres) reposera strictement sur du code C++ 17 standard.

#include <thread>
#include <chrono>
#include <iostream>
using namespace std;
using namespace std::chrono;

Dans cette version, l'état sera modélisé par une simple classe contenant un entier.

struct State {
   int n {};
   State() = default;
   constexpr State(int n) : n{ n } {
   }
};

La tâche à faire sera modélisée par une paire de traitements (bidons) : l'un artificiellement long, qui représente une tâche critique, et un autre résiduel (la modification de l'état).

State critical_work(State st) {
   this_thread::sleep_for(100ms); // tâche difficile
   return st;
}
State residual_work(State st) {
   ++st.n;
   return st;
}

Le traitement morcelable est modélisé par la fonction processsing(st,pred) st est l'état tenu à jour d'une itération à l'autre, et où pred est un prédicat servant à titre de condition de poursuite.

template <class Pred>
   State processing(State st, Pred pred) {
      for(;;) {
         if(!pred()) break;
         st = critical_work(st);
         if(!pred()) break;
         st = residual_work(st);
      }
      return st;
   }

Le programme de test crée un prédicat distinct à chaque appel de la fonction processing(), et appelle cette fonction jusqu'à ce que l'état ait été incrémenté dix fois.

Notez que le prédicat utilisé ici fait en sorte que la condition de poursuite sera fausse dès son deuxième appel dans processing(), ceci pour faciliter l'affichage. Il s'agit d'une situation tout à fait artificielle.

int main() {
   auto make_pred = [] {
      return [deadline = high_resolution_clock::now() + 150ms]{
         return high_resolution_clock::now() < deadline;
      };
   };
   for (auto st = processing({}, make_pred()); st.n < 10; st = processing(st, make_pred()))
      cout << st.n << endl;
}

Une variante (simple) serait de modéliser l'état par un simple entier plutôt que par une classe :

Avec une classe Plus simplement
// ...
struct State {
   int n {};
   State() = default;
   constexpr State(int n) : n{ n } {
   }
};
State critical_work(State st) {
   this_thread::sleep_for(100ms); // tâche difficile
   return st;
}
State residual_work(State st) {
   ++st.n;
   return st;
}
// ...
using State = int;
State critical_work(State st) {
   this_thread::sleep_for(100ms); // tâche difficile
   return st;
}
State residual_work(State st) {
   ++st;
   return st;
}

Morcelabilité par interface

Dans un langage où la généricité serait plus restrictive (p. ex. : Java, C#) et exigerait l'implémentation d'interfaces spécifiques, il demeurerait bien sûr possible d'implémenter un système de tâches morcelables.

Cet exemple (comme presque tous les autres) reposera strictement sur du code C++ 17 standard.

#include <thread>
#include <chrono>
#include <iostream>
#include <functional>
#include <memory>
using namespace std;
using namespace std::chrono;

Dans notre cas, l'interface Morcelable exposera deux services :

  • Un service polymorphique init(), que les enfants pourront spécialiser s'ils souhaitent une initialisation propre à leur classe, et
  • Un service abstrait work(pred) qui acceptera en paramètre un prédicat modélisant une condition de poursuite et implémentera le calcul à réaliser
struct Morcelable {
   virtual void init() {}
   virtual void work(function<bool()>) = 0;
   virtual ~Morcelable() = default;
};

La tâche à faire sera modélisée par un traitement (bidon) artificiellement long, suivi d'un effet de bord observable (la modification de l'état). L'état du traitement est ici tenu à jour entre les appels dans une instance de la classe Incrementer, qui est un enfant du parent Morcelable.

Chaque appel à work(pred) avec un nouveau prédicat mènera à une nouvelle séquence de calculs morcelables, faisant progresser l'état tenu à jour par le fait-même.

class Incrementer : public Morcelable {
   int &n;
public:
   Incrementer(int &n) : n{ n } {
   }
   void work(function<bool()> pred) override {
      for(;;) {
         if(!pred()) return;
         // faire les tâches importantes
         this_thread::sleep_for(100ms);
         if(!pred()) return;
         // faire les tâches résiduelles
         ++n;
      }
   }
};

Le programme de test crée un prédicat distinct à chaque appel de la méthode p->work(), et appelle cette méthode  jusqu'à ce que l'état ait été incrémenté dix fois. Dans cet exemple, l'entier manipulé par l'instance d'Incrementer est local au programme de test, et est manipulé indirectement par ce dernier.

Notez que le prédicat utilisé ici fait en sorte que la condition de poursuite sera fausse dès son deuxième appel dans p->work(), ceci pour faciliter l'affichage. Il s'agit d'une situation tout à fait artificielle.

int main() {
   auto make_pred = [] {
      return [deadline = high_resolution_clock::now() + 150ms]{
         return high_resolution_clock::now() < deadline;
      };
   };
   int n{};
   unique_ptr<Morcelable> p{ make_unique<Incrementer>(n) };
   for (p->work(make_pred()); n < 10; p->work(make_pred()))
      cout << n << endl;
}

Une variante serait d'appliquer l'idiome NVI. Cet idiome consiste en l'exposition d'une méthode non-virtuelle publique dans le parent, qui appellera une méthode abstraite protégée (que les enfants implémenteront à leur guise), ce qui donne un point d'ancrage unique pour insérer une trace des appels, une validation des préconditions et des postconditions, et ainsi de suite. Certains appelleront cet idiome le Template Method, au sens où une fonction décrira les modalités d'appel des autres fonctions et encadrera leur exécution :

Sans NVI Avec NVI
// ...
struct Morcelable {
   virtual void init() {}
   virtual void work(function<bool()>) = 0;
   virtual ~Morcelable() = default;
};
class Incrementer : public Morcelable {
   int &n;
public:
   Incrementer(int &n) : n{ n } {
   }
   void work(function<bool()> pred) override {
      for(;;) {
         if(!pred()) return;
         // faire les tâches importantes
         this_thread::sleep_for(100ms);
         if(!pred()) return;
         // faire les tâches résiduelles
         ++n;
      }
   }
};
// ...
struct Morcelable {
   virtual void init() {}
   void work(function<bool()> pred) {
      for(;;) {
         if(!pred()) return;
         critical_work();
         if(!pred()) return;
         residual_work();
      }
   }
   virtual ~Morcelable() = default;
protected:
   virtual void critical_work() = 0;
   virtual void residual_work() {}
};
class Incrementer : public Morcelable {
   int &n;
   void critical_work() override {
      this_thread::sleep_for(100ms);
   }
   void residual_work() override {
      ++n;
   }
public:
   Incrementer(int &n) : n{ n } {
   }
};

Morcelabilité par voie de coroutine

Les coroutines, en tant que fonctions résumables, sont l'incarnation la plus directe de l'idée de tâche morcelable.

Cet exemple reposera pour l'essentiel sur du code C++ 17 standard, mais avec ajout du support des coroutines (qui ont été votées pour inclusion dans le standard à partir de C++ 20). Ceci explique l'inclusion d'en-têtes dans le dossier experimental, et le recours à un outil qui se trouve dans ceux-ci.

#include <thread>
#include <chrono>
#include <iostream>
#include <experimental/coroutine>
#include <experimental/generator>
using namespace std;
using namespace std::chrono;
using experimental::generator;

Dans cette version, l'état sera modélisé par un simple entier.

using State = int;

La tâche à faire sera modélisée par un traitement (bidon) artificiellement long, suivi d'un effet de bord observable (la modification de l'état). Notez le recours au type de retour generator<State>, ce qui permettra au code client d'itérer sur les valeurs retournées par les coroutines comme s'il s'agissait d'un conteneur (ou de toute autre entité susceeptible d'être parcourue à l'aide d'une paire d'itérateurs).

template <class Pred>
   generator<State> work(State st, Pred pred) {
      for (;;) {
         if(!pred()) co_yield st;
         this_thread::sleep_for(100ms); // tâche difficile
         if(!pred()) co_yield st;
         ++st; // tâche résiduelle
      }
   }

Dans le code de test, la nature d'une coroutine influencera le code utilisé pour l'appeler :

  • Les paramètres ne seront évalués qu'au premier appel. Pour cette raison, nous n'utiliserons pas ici de fabrique comme le make_pred() des exemples précédents, utilisant plutôt un prédicat dont l'échéance sera controlée par le code client
  • La répétitive dans le code client est potentiellement infinie, et opère en apparence sur une séquence. Pour cette raison, la répétitive est interrompue à l'interne (un break) lorsque la condition d'arrêt est rencontrée
  • La condition d'arrêt telle qu'exprimée fera en sorte que la coroutine incrémentera en général deux fois son état interne par itération de la boucle de test dans main(). Voyez-vous pourquoi?
int main() {
   auto deadline = high_resolution_clock::now() + 150ms;
   auto pred = [&deadline] {
      return high_resolution_clock::now() < deadline;
   };
   for (auto n : work({}, pred)) {
      if (n >= 10) break;
      cout << n << endl;
      deadline = high_resolution_clock::now() + 150ms;
   }
}

Valid XHTML 1.0 Transitional

CSS Valide !