Copie de fichier avec sockets de type datagramme (sockets UDP) – exemple de solution

Ce qui suit se veut un petit exemple de solution à un problème proposé informellement en classe, soit celui d'écrire un programme (inefficace, mais c'est pour se faire la main) qui consommera des données d'un fichier source à l'aide d'un fil d'exécution, et rendra progressivement ces données disponibles à d'autres à travers un socket de type datagramme à l'autre extrémité duquel un autre fil d'exécution les en consommera pour les écrire dans un autre fichier.

Conceptuellement, le code proposé se sépare en deux processus :

J'ai mis le tout dans un même processus pour simplifier la présentation, mais je vous invite bien sûr à finir le travail; cela impliquera un peu de copier/ coller, pas beaucoup plus de travail que ça.

J'ai proposé que vous acomplissiez cette tâche avec un fichier binaire quelconque, et le code ci-dessous fonctionnerait, avec quelques modifications, avec un .jpeg ou un .mp3, mais j'ai écrit l'exemple de manière à vous le rendre disponible rapidement alors ma version lit... le code source du programme (fichier Principal.cpp) pour l'écrire dans un autre fichier (sortie.txt), pour enfin en afficher le contenu à la console. Ceci permet de s'assurer qu'il ne manquera aucun caractère en fin de parcours (un réel piège ici).

Puisque les sockets ne sont pas représentés par des types standards de C++ mais bien par des bibliothèques locales aux plateformes, j'ai isolé le code des sockets derrière une interface simple inspirée de l'idiome pImpl. Conséquemment, le code ci-dessous se subdivise en quatre fichiers :

Si vous séparez le tout en deux processus, seul Principal.cpp sera affecté.

Fichier messages.h

Le fichier messages.h, comme indiqué plus haut, décrit le format d'un message dans ce programme. Par souci de similitude avec la version TCP d'un programme analogue, nous transférerons le contenu d'un fichier texte à travers un canal déterminé par une paire de sockets.

Toutefois, la métaphore UDP demande de passer par des enregistrements, pas par un flux de données, alors nous utiliserons des petits blocs représentant des lignes d'un fichier texte. Puisque le texte devra être encodé à même les messages, nous utiliserons un bloc de capacité fixe (un tableau), ce qui limitera la taille des lignes à cette capacité.

#ifndef MESSAGES_H
#define MESSAGES_H
#include <string>
#include <algorithm>
#include <cassert>
#include <cstdint>
#include <iosfwd>
#include <iterator>

Une ligne, donc, sera constituée d'un bloc de char, le tableau data, et d'une taille exprimée en nombre de caractères, soit l'attribut size. La capacité d'une ligne est déterminée par la constante CAPACITE_MAX, mais puisqu'il s'agit d'une constante de classe, cette information n'occupe pas d'espace dans une ligne donnée.

J'y ai implémenté un constructeur par défaut, représentant une ligne vide, et un constructeur paramétrique sur la base d'une std::string, qui valide l'absence de débordement de capacité et initialise une ligne avec le contenu de la std::string. Le constructeur par défaut permettra au code client de recevoir une ligne (les fonctions de réception destinées à un socket UDP doivent écrire dans une variable existante).

J'ai traité un débordement de capacité comme une erreur grave (volation d'une assertion dynamique), mais si vous estimez qu'il s'agit d'une erreur dont il est raisonnable de récupérer, vous pouvez bien sûr utiliser une levée d'exception ici.

struct ligne {
   using size_type = uint32_t;
   static constexpr const std::size_t CAPACITE_MAX = 1'024;
   char data[CAPACITE_MAX];
   size_type size {};
   ligne() = default;
   ligne(const std::string &s) : size{ s.size() } {
      assert(size <= CAPACITE_MAX);
      std::copy(std::begin(s), std::end(s), begin());
   }
   using iterator = char*;
   using const_iterator = const char*;
   iterator begin() {
      return std::begin(data);
   }
   iterator end() {
      return begin() + size;
   }
   const_iterator begin() const {
      return std::begin(data);
   }
   const_iterator end() const {
      return begin() + size;
   }
};

J'ai déterminé trois sortes de messages, identifiés par un code de type sorte_message. C'est une technique classique, qu'on applique souvent avec des union étiquetés.

enum class sorte_message {
   debut, ligne, fin
};

Un message en soi sera ici un enregistrement comprenant :

  • Un numéro de séquence, pour permettre au code client de s'assurer que les messages reçus sont assemblés dans un ordre respectant l'intention de l'émeteur
  • Une sorte de message, et
  • Une ligne, qui est en fait le contenu du message (le Payload)

Pour faciliter les tests, un opérateur de projection sur un flux pour un message a été écrit. Son implémentation, dans messages.cpp, va comme suit :

#include "messages.h"
#include <ostream>
#include <algorithm>
#include <iterator>
using namespace std;
ostream& operator<<(ostream &os, const ligne &ze_ligne) {
   copy(begin(ze_ligne), end(ze_ligne), ostream_iterator<char>{ os });
   return os;
}
struct message {
   int seq_id {};
   sorte_message sorte { sorte_message::debut };
   ligne ligne_;
   message() = default;
   message(int seq, sorte_message sorte) : seq_id{ seq }, sorte{ sorte } {
   }
   message(int seq, const ligne &lgn)
      : seq_id{ seq }, sorte{ sorte_message::ligne }, ligne_{ lgn }
   {
   }
};
std::ostream& operator<<(std::ostream&, const ligne&);
#endif

Fichier socket_udp.h

J'ai implémenté une couche de type pImpl sur les sockets de type datagramme pour que leur interface (le fichier socket_udp.h) soit pleinement portable, et pour éviter de trop faire fuir les détails de la plateforme sous-jacente dans le code client. Notez que le code proposé ici est du code écrit rapidement.

Le code des sockets de type datagramme proposé ici est logé dans l'espace nommé socket_udp.

#ifndef SOCKET_UDP_H
#define SOCKET_UDP_H
#include <memory>
#include <string>
#include <cstdint>
#include <exception>
#include <type_traits>
namespace socket_udp {

J'ai défini une classe incopiable nommée chargeur dont le constructeur chargera les sockets en mémoire et dont le destructeur assurera la finalisation correspondante. En pratique, le programme principal définira localement une instance de chargeur avant de commencer à manipuler des sockets.

Le chargement explicite des sockets est un artéfact de Microsoft Windows; sous Linux, le constructeur et le destruceur de cette classe seraient vides.

   struct chargeur {
      chargeur();
      ~chargeur();
      chargeur(const chargeur&) = delete;
      chargeur&operator=(const chargeur&) = delete;
   };

L'implémentation à proprement dit d'un socket est abstraite sous une déclarations a priori.

La classe pImpl que j'ai utilisé joue deux rôles :

  • Isoler l'implémentation derrière une classe a priori indéfinie (impl pour socket_impl), et
  • Assurer une gestion RAII de l'implémentation, à travers des unique_ptr.

Notez que j'ai implémenté la sémantique de mouvement, ce qui me permettra de manipuler mes objets un peu comme on manipulerait des classes concrètes ou des types primitifs, mais que mes classes sont incopiables en vertu de leur attribut qui est lui-même incopiable.

Notez aussi que les attributs de cette classe sont publics, mais que cela ne m'importe pas beaucoup ici puisque le type du pointé est inaccessible au code client, qui ne pourra à peu près rien faire avec lui.

   struct impl;
   struct socket_impl {
      std::unique_ptr<impl> p;
      socket_impl() = delete;
      socket_impl(std::unique_ptr<impl>&&);
      socket_impl(socket_impl&&);
      ~socket_impl();
   };

Pour faciliter la gestion des erreurs, j'ai défini une classe par type d'exception possible. Ne confondez pas ceci avec « une exception par code d'erreur possible », ce qui serait faux ici (il y a des tas de codes d'erreurs possibles pour chaque opération sur un socket). Je me suis intéressé à deux choses, pratiquement :

  • L'opération qui aura échoué, et
  • Le code d'erreur rapporté par l'implémentation sous-jacente

Normalement, je n'aurais pas utilisé std::exception ou des messages d'erreur pour chaque type, mais dans le code de test que j'ai écrit, j'ai limité le traitement des exceptions à un affichage suivi de la conclusion du fil d'exécution ayant détecté le problème. Cette stratégie plutôt naïve ne se veut pas de qualité industrielle, et se limite à rapporter les cas problèmes plutôt que de les gérer.

   struct erreur : std::exception {
      int code;
      erreur(const char*);
   };
   struct erreur_creation : erreur { erreur_creation() : erreur{"socket()"} {} };
   struct erreur_connexion : erreur { erreur_connexion() : erreur{"connect()"} {} };
   struct erreur_lier_port : erreur { erreur_lier_port() : erreur{"bind()"} {} };
   struct erreur_envoi : erreur { erreur_envoi() : erreur{"send()"} {}};
   struct erreur_reception : erreur { erreur_reception() : erreur{"recv()"} {}};

Enfin, j'ai exprimé les opérations exposées au code client dans des fonctions opérant sur mes classes pImpl, et faisant essentiellement abstraction des détails d'implémentation. À noter tout de même :

  • Le port est représenté par un entier non-signé sur 16 bits, dans le respect des règles sur Internet
  • Les fonctions de bas niveau pour envoyer et recevoir des données opèrent sur des trames de bytes bruts (une interface simple permettant d'alléger l'écriture du code client a été offerte pour envoyer_vers() et recevoir_de())
  • Puisque le code client est susceptible d'utiliser des entiers dans les trames, j'ai implémenté des fonctions pour normaliser et pour dénormaliser des entiers à même l'interface exposée par cet espace nommé

C'est une interface naïve mais opaque et utilisable.

// ...
   socket_impl creer(std::uint16_t port);
   int envoyer_vers(socket_impl&, const std::string &adr, std::uint16_t, const char*, int);
   int recevoir_de(socket_impl&, const std::string &adr, std::uint16_t, char *buf, int cap);
   template <class T>
      int envoyer_vers(socket_impl &sck, const std::string &adr, std::uint16_t port, const T &obj) {
         static_assert(std::is_trivially_copyable_v<T>);
         return envoyer_vers(sck, adr, port, reinterpret_cast<const char*>(&obj), sizeof(T));
      }
   template <class T>
      int recevoir_de(socket_impl &sck, const std::string &adr, std::uint16_t port, T &obj) {
         static_assert(std::is_trivially_copyable_v<T>);
         return recevoir_de(sck, adr, port, reinterpret_cast<char*>(&obj), sizeof(T));
      }
   void fermer(socket_impl);
   std::uint16_t normaliser(std::uint16_t);
   std::uint32_t normaliser(std::uint32_t);
   std::uint16_t denormaliser(std::uint16_t);
   std::uint32_t denormaliser(std::uint32_t);
}
#endif

Fichier socket_udp.cpp

Sans surprises, l'implémentation des types et des fonctions déclarées dans socket_udp.h repose en partie sur du code propre à la plateforme, donc éminemment non-portable. Le code proposé ici vaut pour Microsoft Windows, et peut bien entendu être adapté au besoin pour les autres plateformes. Considérez cela comme un exercice.

#include "socket_udp.h"
#include <WinSock2.h>
#include <WS2tcpip.h>
#include <memory>
#include <string>
#include <cstdint>
using namespace std;
#pragma comment (lib, "ws2_32.lib")
namespace socket_udp {

J'ai implémenté une classe parent nommée erreur pour mes cas d'exceptions, ceci dans l'optique d'alléger mon propre travail à peu de frais. Chaque erreur dérive de std::exception et lui relaie un bref message expliquant la nature de l'opération ayant échoué. Un code d'erreur logé à même erreur, implémenté sous forme de constante d'instance (initialisée à la construction) est exposé de manière publique et permet de comprendre plus en détail la nature du problème s'étant manifesté.

   erreur::erreur(const char *msg) : exception{ msg }, code{ WSAGetLastError() } {
   }

La classe chargeur réalise un chargement et un déchargement RAII du module de sockets de Microsoft Windows. Sous Linux, on laissera ces deux fonctions vides, tout simplement.

// ...
   chargeur::chargeur() {
      WSADATA data { };
      auto res = WSAStartup(MAKEWORD(2,2), &data);
   }
   chargeur::~chargeur() {
      WSACleanup();
   }

La classe impl isole le détail d'implémentation qu'est un socket pour la plateforme. J'y ai essentiellement implémenté les éléments constitutifs de la règle de cinq que j'estimais pertinents, la rendant entre autres incopiable, de même qu'une méthode pour transférer explicitrement un socket vers un tiers.

   struct impl {
      SOCKET sck;
      impl(SOCKET sck) : sck{ sck } {
      }
      impl(impl && other) : sck{ other.sck } {
         other.sck = INVALID_SOCKET;
      }
      impl(const impl&) = delete;
      impl& operator=(const impl&) = delete;
      ~impl() {
         if (sck != INVALID_SOCKET) {
            shutdown(sck, SD_BOTH);
            closesocket(sck);
         }
      }
      SOCKET transfer() {
         return std::exchange(sck, INVALID_SOCKET);
      }
   };

La classe pImpl servant à encapsuler un impl se limite à implémenter le mouvement, ce qui lui suffit.

   socket_impl::socket_impl(unique_ptr<impl> &&p) : p{ std::move(p) } {
   }
   socket_impl::socket_impl(socket_impl&&) = default;
   socket_impl::~socket_impl() = default;

La fonction creer_socket() est pour usage interne seulement, n'étant pas exposée dans socket_udp.h.

   socket_impl creer_socket() {
      auto sck = make_unique<impl>(socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP));
      if (sck->sck == INVALID_SOCKET) throw erreur_creation{};
      return { std::move(sck) };
   }

La fonction creer() retourne un socket capable d'envoyer et de recevoir des trames UDP. Elle encapsule les détails techniques reliés à une connexion bloquante par socket de type datagramme lié à un port spéficiés en paramètre.

   socket_impl creer(uint16_t port) {
      auto sck = creer_socket();
      sockaddr_in moi { };
      moi.sin_family = AF_INET;
      moi.sin_port = normalisr(port);
      auto res = bind(sck.p->sck, reinterpret_cast<const sockaddr*>(&moi), sizeof(moi));
      if (res == SOCKET_ERROR) throw erreur_lier_port{};
      const BOOL opt = TRUE;
      res = setsockopt(sck.p->sck, SOL_SOCKET, SO_BROADCAST, reinterpret_cast<const char *>(&opt), sizeof(opt));
      if (res == SOCKET_ERROR) throw erreur_lier_port{};
      return sck;
   }

La fonction envoyer_vers() émet sur un socket UDP une trame d'une certaine taille vers un destinataire identifié en paramètre, et retourne le nombre de bytes réellement envoyés.

   int envoyer_vers(socket_impl &sck, const string &adr, uint16_t port, const char *buf, int len) {
      sockaddr_in dest { };
      inet_pton(AF_INET, adr.c_str(), &dest.sin_addr.S_un.S_addr);
      dest.sin_port = normaliser(port);
      dest.sin_family = AF_INET;
      auto res = sendto(sck.p->sck, buf, len, 0, reinterpret_cast<const sockaddr*>(&dest), sizeof(dest));
      if (res == SOCKET_ERROR) throw erreur_envoi{};
      if (res < len) throw erreur_envoi{};
      return res;
   }

La fonction recevoir_de() reçoit sur un socket UDP une trame provenant d'un destinataire identifié par les paramètre, et retourne de nombre de bytes réellement reçus

   int recevoir_de(socket_impl &sck, const string &adr, uint16_t port, char *buf, int cap) {
      sockaddr_in src { };
      inet_pton(AF_INET, adr.c_str(), &src.sin_addr.S_un.S_addr);
      src.sin_port = normaliser(port);
      int len = sizeof(src);
      auto res = recvfrom(sck.p->sck, buf, cap, 0, reinterpret_cast<sockaddr*>(&src), &len);
      if (res == SOCKET_ERROR) throw erreur_reception{};
      return res;
   }

De manière amusante, la fonction fermer(), sans l'annoncer ouvertement, ne fait rien : c'est un exemple de ce qu'on nomme une fonction Sink, donc une fonction à qui l'on confie un paramètre qui ne ressortira plus.

Ici, le type accepté ne peut être passé que par mouvement, et est un type RAII, ce qui fait qu'on n'a même pas à nommer les paramètres : le simple fait de les recevoir localement mènera à leur éventuelle destruction, et à leur finalisation en propre.

   void fermer(socket_impl) { // sink
   }

Enfin, les fonctions de normalisation et de dénormalisation permettent principalement d'isoler le code client des détails d'implémentation de ces macros, et des en-têtes qui les définissent. Ce sont des fonctions pour le moins triviales.

   uint16_t normaliser(uint16_t n) { return htons(n); }
   uint32_t normaliser(uint32_t n) { return htonl(n); }
   uint16_t denormaliser(uint16_t n) { return ntohs(n); }
   uint32_t denormaliser(uint32_t n) { return ntohl(n); }
}

Fichier Principal.cpp (naïf)

Remarquez tout d'abord que le code proposé ici est totalement portable à toute plateforme, dans la mesure où un compilateur C++ 11 y est disponible (et dans la mesure où socket_udp.h est portable, bien sûr).

#include "socket_udp.h"
#include "messages.h"
#include <fstream>
#include <mutex>
#include <thread>
#include <vector>
#include <algorithm>
#include <atomic>
#include <iterator>
#include <iostream>
using namespace std;

Le programme principal se séparera en deux fils d'exécution :

  • emission, qui consommera du flux en entrée et produira des données sur un socket UDP, et
  • reception, qui consommera d'un socket UDP et produira des données dans un flux en sortie

Aucun signal de fin ne sera requis; l'erreur résultant de la fermeture d'un socket, joint à l'émission d'un message indiquant explicitement la fin de la transmission, suffiront à constater la fin des opérations.

//
// Pour un plus joli programme... séparer en deux programmes :)
//
int main() {
   using namespace socket_udp;
   chargeur ch;
   static constexpr const uint16_t ZE_PORT_EMETTEUR = 4567,
                                   ZE_PORT_RECEPTEUR = 5678;
   auto emett = creer(ZE_PORT_EMETTEUR),
        recep = creer(ZE_PORT_RECEPTEUR);

Le fil d'exécution emission consommera des données du disque et les enverra, un message à la fois, vers le récepteur. Un message de fin conclura l'envoi.

// ...
   thread emission([&emett]() {
      ifstream in{ "Principal.cpp" };
      try {
         int seq_id{};
         const auto dest_adr = "127.0.0.1"s;
         auto res = envoyer_vers(emett, dest_adr, ZE_PORT_RECEPTEUR, message{ ++seq_id, sorte_message::debut });
         for (string s; getline(in, s);)
            res = envoyer_vers(emett, dest_adr, ZE_PORT_RECEPTEUR, message{ ++seq_id, ligne{ s } });
         res = envoyer_vers(emett, dest_adr, ZE_PORT_RECEPTEUR, message{ ++seq_id, sorte_message::fin });
      } catch (erreur &err) {
         cerr << "Erreur " << err.what() << ", code " << err.code << endl;
      }
      fermer(std::move(emett));
   });

Le fil d'exécution reception consommera une trame à la fois et en inscrira le contenu dans un fichier de destination. La réception d'un message représentant la fin des envois conclura les opérations..

// ...
   thread reception([&recep]() {
      ofstream out{ "sortie.txt" };
      try {
         bool poursuivre = true;
         do {
            message msg;
            auto n = recevoir_de(recep, "127.0.0.1"s, ZE_PORT_EMETTEUR, msg);
            if (n)
               switch (msg.sorte) {
               case sorte_message::debut:
                  ;
               case sorte_message::ligne:
                  out << msg.ligne_ << '\n';
                  break;
               case sorte_message::fin:
                  poursuivre = false;
               }
         } while (poursuivre);
      } catch (erreur &err) {
         cerr << "Erreur " << err.what() << ", code " << err.code << endl;
      }
      fermer(std::move(recep));
   });

Avant de fermer le programme, nous attendons la complétion de l'exécution des deux fils d'exécution, puis (pour fins de débogage) nous affichons le contenu du fichier de destination à la console.

Si vous avez copié un fichier contenant des données autres que du texte, présumant que l'extension du fichier de destination corresponde à son contenu, il suffit (du moins avec Microsoft Windows) de remplacer l'appel à copy() par un appel à system() en lui passant le nom du fichier en question, ce qui démarrera le programme associé à ce type de fichier.

// ...
   emission.join();
   reception.join();
   ifstream in{ "sortie.txt" };
   copy(istreambuf_iterator<char>{ in }, istreambuf_iterator<char>{},
        ostream_iterator<char>{ cout });
}

Évidemment, ce code ne fonctionne que si les paquets arrivent au destinataire sans pertes et dans l'ordre selon lequel ils ont été émis. Ce scénario « idéal » n'est pas réaliste en général.

Version plus complète

Examinons maintenant une version plus complète (et plus réaliste, bien que très simple) d'un programme réalisant la même tâche que celui décrit ci-dessus.

Nous conserverons la même couche primitive de sockets UDP (voir socket_udp.h), mais raffinerons à la fois la modélisation des messages et le programme de test.

Fichier messages.h (plus complet)

Tout d'abord, notons que du fait que la représentation d'un message sera plus riche et plus complexe que dans la version précédente, j'ai fait le choix de loger les outils qui y participent dans un espace nommé msg.

#ifndef MESSAGES_H
#define MESSAGES_H
#include <string>
#include <algorithm>
#include <cassert>
#include <cstdint>
#include <iosfwd>
#include <iterator>
namespace msg {

Dans cette nouvelle déclinaison, les sortes de messages sont un peu plus nombreuses et incluent l'idée d'un message inconnu (par exemple, un message qui ne serait pas encore convenablement initialisé), un début de transmission, une fin de transmission, du texte (car je transférerai du texte), un accusé de réception, et un signal qu'une trame aurait été manquée.

Pour formaliser la numérotation du séquencement des messages, j'ai défini un type spécifique (sequence_type) ce qui permet au code client d'écrire facilement du code en concordance avec la nature de cette API.

   //
   // Sortes de messages (identifiants)
   //
   enum class sorte { inconnu, debut, fin, texte, ack, miss };
   //
   // pour le sequencement de messages
   //
   using sequence_type = unsigned int;

Pour clarifier la nature des messages en tant que tels, j'ai défini un type distinct pour chaque sorte de message.

Les deux plus simples, nommés ici message_debut et message_fin, sont des classes vides qui modélisent des concepts purs. Il serait possible de leur attribuer des membres (les autres types de messages ci-dessous en contiennent d'ailleurs), dans le respect de certaines règles.

   struct message_debut {
      // il pourrait y avoir quelque chose ici, au besoin
   };
   struct message_fin {
      // il pourrait y avoir quelque chose ici, au besoin
   };

La principale règle en ce sens est que les constructeurs de nos types de messages devront être triviaux, ce qui implique que les constructeurs de leurs attributs le soient aussi. La raison pour cette règle est que les messages seront modélisés par des union étiquetés, comme cela s'avère souvent avec des trames transmises par UDP, et que les membres d'un union doivent être trivialement constructibles.

Le type de message le plus complexe que nous utiliserons sera le message_texte, qui offre entre autres une interface permettant un parcours du texte qu'il inclut à l'aide d'itérateurs.

Remarquez la fonction de fabrication creer_texte(); n'ayant pas l'option d'utiliser des constructeurs non-triviaux, les fabriques sont une alternative intéressante pour définir des opérations d'initialisation qui ne sont pas banales.

   //
   // Un objet dans un union doit n'avoir que des constructeurs et des destructeurs triviaux
   //
   struct message_texte {
      using size_type = uint32_t;
      static constexpr const std::size_t CAPACITE_MAX = 1024;
      char data[CAPACITE_MAX];
      size_type size;
      using iterator = char*;
      using const_iterator = const char*;
      iterator begin() {
         return std::begin(data);
      }
      iterator end() {
         return begin() + size;
      }
      const_iterator begin() const {
         return std::begin(data);
      }
      const_iterator end() const {
         return begin() + size;
      }
   };
   message_texte creer_texte(const std::string&);

De manière plus humble, mais suivant le même modèle, les types de messages représentant un accusé de réception (message_ack) ou un signal de trame manquée (message_miss) contiennent le numéro de séquence de la trame reçue ou manquante, et les fonctions de fabrication make_ack() et make_miss() facilitent leur construction.

   struct message_ack {
      int acked_seq;
   };
   struct message_miss {
      int missed_seq;
   };
   message_ack make_ack(sequence_type ackeq_seq);
   message_miss make_miss(sequence_type misseq_seq);

Le type message en tant que tel est un union étiqueté. Son attribut seq_id indique son numéro de séquence, chose importante du fait que le protocole UDP ne garantit pas une livraison des trames dans l'ordre selon lequel elles ont été envoyées.

La paire faite de sorte_ et de l'union (anonyme) des divers types de messages constitue l'union étiqueté en tant que tel.

Les divers constructeurs permettent de bien initialiser un message en fonction de paramètres de types choisis.

   //
   //
   //
   struct message {
      sequence_type seq_id;
      sorte sorte_{ sorte::inconnu };
      union {
         message_debut debut;
         message_fin fin;
         message_texte texte;
         message_ack ack;
         message_miss miss;
      };
      message() = default;
      message(sequence_type seq_id, message_debut md)
         : seq_id{ seq_id }, sorte_{ sorte::debut }, debut{ md } {
      }
      message(sequence_type seq_id, message_fin mf)
         : seq_id{ seq_id }, sorte_{ sorte::fin }, fin{ mf } {
      }
      message(sequence_type seq_id, message_texte mt)
         : seq_id{ seq_id }, sorte_{ sorte::texte }, texte{ mt } {
      }
      message(sequence_type seq_id, message_ack ma)
         : seq_id{ seq_id }, sorte_{ sorte::ack }, ack{ ma } {
      }
      message(sequence_type seq_id, message_miss ms)
         : seq_id{ seq_id }, sorte_{ sorte::miss }, miss{ ms } {
      }
   };
}

Enfin, pour faciliter le débogage et les tests, une fonction pour afficher le contenu d'un message_texte est offerte.

std::ostream& operator<<(std::ostream&, const msg::message_texte&);
#endif

Fichier messages.cpp (plus complet)

Le fichier messages.cpp est relativement simple, se limitant à quelques fonctions de fabrication et un outil pour afficher du texte sur un flux.

#include "messages.h"
#include <ostream>
#include <algorithm>
#include <iterator>
#include <string>
#include <cassert>
using namespace std;

Cette dernière est des plus simples, considérant la présence d'iune interface simple d'itérateurs dans le type message_texte.

ostream& operator<<(ostream &os, const msg::message_texte &txt) {
   copy(begin(txt), end(txt), ostream_iterator<char>{ os });
   return os;
}

Les fonctions de fabrication pallient l'absence de constructeurs dans les types fabriqués; souvenons-nous que le recours à des constructeurs triviaux dans ces types tient à leur utilisation dans un union, où les règles sont contraignantes.

namespace msg {
   message_texte creer_texte(const std::string &s) {
      assert(s.size() < message_texte::CAPACITE_MAX);
      message_texte txt;
      txt.size = s.size();
      copy(begin(s), end(s), txt.begin());
      return txt;
   }
   message_ack make_ack(int ackeq_seq) {
      message_ack ack;
      ack.acked_seq = ackeq_seq;
      return ack;
   }
   message_miss make_miss(int misseq_seq) {
      message_miss miss;
      miss.missed_seq = misseq_seq;
      return miss;
   }
}

Fichier Principal.cpp (plus complet)

Le code de test est « plus complet », sans être « vraiment complet ». Pour être vraiment complet, il devrait tenir compte de beaucoup plus de formes d'attaque et de situations anormales qu'il ne le fait; en retour, en examinant quelques cas d'erreurs communs, il peut servir de guide.

#include "socket_udp.h"
#include "messages.h"
#include <fstream>
#include <mutex>
#include <thread>
#include <vector>
#include <algorithm>
#include <atomic>
#include <iterator>
#include <iostream>
#include <deque>
using namespace std;

Pour la communication, le modèle mis en application suppose que les messages doivent utiliser une numérotation croissante monotone () et que le destinataire s'attend à une telle numérotation. Le destinataire doit émettre un accusé de réception pour chaque message reçu (mais ne pas recevoir un accusé de réception n'est pas considéré comme une erreur; après tout, ces trames peuvent se perdre en chemin). Si une trame émise ne se rend pas à destination, ou si une trame semble escamotée (parce qu'une trame ultérieure semble arriver avant une trame antérieure), le destinataire fait signe à l'émetteur qui reprendra l'envoi des trames à partir de celle qui semble avoir fait défaut.

J'ai suivi le modèle suivant. Il est imparfait mais illustratif :

  • Un émetteur (fil d'exécution emetteur, accompagné du fil d'exécution nettoyeur_emission) prépare des messages, les numérote et les insère dans une file de messages à envoyer (attribut a_envoyer d'un canal_sortant)
  • Envoyer un message, outre l'écriture d'une trame UDP en tant que telle, implique prendre le prochain message sortant de la file a_envoyer et l'insérer dans une autre file (attribut envoyes de canal_sortant)
  • Les messages envoyés sont conservés dans la file envoyes jusqu'à réception d'un accusé de réception du destinataire
  • Sur réception d'un accusé de réception, le message envoyé portant le numéro de séquence indiqué dans l'accusé de réception est supprimé de la file de messages envoyés
  • Sur réception d'un message indiquant qu'une trame UDP manque (parce qu'elle est arrivée hors-séquence, perdue ou pour toute autre raison), la transmission reprend à partir du message portant le numéro de séquence semblant manquer. En pratique, les messages envoyés à partir de la trame manquante sont réinsérés dans la file des messages à envoyer

Le fil d'exécution emetteur enverra les messages. Le fil d'exécution nettoyeur_emisson consommera les accusés de réception et les signalements de trames manquantes. La communication entre eux implique de la synchronisation dû au partage des files envoyes et a_envoyer, d'où le type canal_sortant à droite.

Notez la signature de la plupart des services offerts ici, qui permet de valider le succès ou l'échec de l'entreprise modélisée par le service en tant que tel. Il existe plusieurs moyens de modéliser cette paire faite d'un (possible) résultat de calcul et d'un code de succès ou d'échec, mais l'important est de combiner les deux, pour éviter les TOCTTOU.

class canal_sortant {
   mutable mutex mae,
                 me;
   deque<msg::message> a_envoyer;
   deque<msg::message> envoyes;
public:
   void pour_envoi(msg::message msg) {
      lock_guard<mutex> _{ mae };
      a_envoyer.push_back(msg);
   }
   bool try_fetch_next(msg::message &msg) {
      lock_guard<mutex> _{ mae };
      if (a_envoyer.empty()) return false;
      msg = a_envoyer.front();
      a_envoyer.pop_front();
      return true;
   }
   void envoye(msg::message msg) {
      lock_guard<mutex> _{ me };
      envoyes.push_back(msg);
   }
   bool try_miss(msg::sequence_type seq) {
      lock(mae, me);
      lock_guard<mutex> lckmae{ mae, adopt_lock };
      lock_guard<mutex> lckme{ me, adopt_lock };
      auto pos = find_if(begin(envoyes), end(envoyes), [seq](const msg::message &msg) {
         return msg.seq_id == seq;
      });
      if (pos == end(envoyes))
         return false;
      // transaction
      a_envoyer.insert(begin(a_envoyer), pos, end(envoyes));
      envoyes.erase(pos);
      return true;
   }
   bool try_ack(msg::sequence_type seq) {
      lock_guard<mutex> _{ me };
      auto pos = find_if(begin(envoyes), end(envoyes), [seq](const msg::message &msg) {
         return msg.seq_id == seq;
      });
      if (pos == end(envoyes)) {
         cerr << "WARN : ack on non-acknowledgeable message (seq : " << seq << ")\n";
         return false;
      }
      if (pos != begin(envoyes)) {
         cerr << "WARN : ack out of sequence (seq : " << seq << ")\n";
      }
      envoyes.erase(pos);
      return true;
   }
};

Le programme de test se décline en quelques fils d'exécution, mais en pratique il serait plus réaliste d'avoir plusieurs programmes qui communiquent entre eux.

Dans cette version, j'ai ajouté un booléen atomique du fait que deux fils d'exécution se partagent la responsabilité du volet émetteur, et que je souhaite que les deux se terminent gracieusement.

int main() {
   using namespace socket_udp;
   using namespace msg;
   chargeur ch;
   static constexpr const uint16_t ZE_PORT_EMETTEUR = 4567,
                                   ZE_PORT_RECEPTEUR = 5678;
   auto emett = creer(ZE_PORT_EMETTEUR),
        recep = creer(ZE_PORT_RECEPTEUR);
   canal_sortant canal_emetteur;
   atomic<bool> fin_emission{ false };

L'un des deux fils d'exécution du volet « émission » est nettoyeur_emission.

Son rôle est de recevoir les trames ack (accusé de réception) et miss (trame manquante) du destinataire, et de réorganiser le canal_sortant nommé canal_emetteur en fonction de ces messages reçus. Évidemment, une synchronisation est requise, et est implémentée dans canal_sortant.

Je n'ai implémenté ici que quelques cas d'erreurs possibles; la communication par UDP est remplie de ... divertissements. Ceci se veut illustratif et éducatif, pas complet.

   thread nettoyeur_emission{ [&] {
      const auto dest_adr = "127.0.0.1"s;
      while (!fin_emission) {
         message msg;
         auto res = recevoir_de(emett, dest_adr, ZE_PORT_RECEPTEUR, msg);
         switch (msg.sorte_) {
         case msg::sorte::ack:
            canal_emetteur.try_ack(msg.seq_id);
            break;
         case msg::sorte::miss:
            canal_emetteur.try_miss(msg.seq_id);
            break;
         case msg::sorte::inconnu:
            break; // possible, surtout en début de parcours
         default:
            cerr << "Suspect (seq : " << msg.seq_id << ")\n";
         }
      }
   } };

L'autre fil d'exécution du volet « émission » est emission.

Celui-ci a pour rôle d'envoyer les messages destinés à son homologue, en tenant compte des réponses obtenues de ce dernier. C'est par le canal_sortant nommé canal_emetteur que cette information circulera, étant en partie consommée par le fil d'exécution nettoyeur_emission.

La logique de base est simple : s'il reste au moins un vieux message à envoyer, envoyons-le, sinon consommons le prochain message et ajoutons-le à ce qui doit être envoyé.

 

   thread emission{[&]() {
      ifstream in{ "Principal.cpp" };
      try {
         sequence_type seq_id{};
         const auto dest_adr = "127.0.0.1"s;
         auto debut = message{ ++seq_id, message_debut{} };
         auto res = envoyer_vers(emett, dest_adr, ZE_PORT_RECEPTEUR, debut);
         canal_emetteur.envoye(debut);
         for (bool done = false; !done;) {
            if (msg::message m; canal_emetteur.try_fetch_next(m)) {
               res = envoyer_vers(emett, dest_adr, ZE_PORT_RECEPTEUR, m);
            } else if (string s; getline(in, s)) {
               canal_emetteur.pour_envoi(message{ ++seq_id, creer_texte(s) });
            } else {
               done = true;
            }
         }
         res = envoyer_vers(emett, dest_adr, ZE_PORT_RECEPTEUR, message{++seq_id, message_fin{}});
      } catch (erreur &err) {
         cerr << "Erreur " << err.what() << ", code " << err.code << endl;
      }
      fin_emission = true;
      fermer(std::move(emett));
   }};

Enfin, le fil d'exécution consommateur se nomme reception. Son rôle est simple :

  • Consommer le prochain message
  • S'il s'agit d'un début, alors... débuter
  • S'il s'agit d'une fin, alors... terminer
  • S'il s'agit de contenu, alors le traiter et en accuser réception, sauf s'il est hors-séquence, dans quel cas mieux vaut en informer l'émetteur
   thread reception{[&] {
      ofstream out{ "sortie.txt" };
      try {
         bool poursuivre = true;
         msg::sequence_type plus_recent_seq{};
         do {
            msg::message m;
            auto n = recevoir_de(recep, "127.0.0.1"s, ZE_PORT_EMETTEUR, m);
            if (n)
               switch (m.sorte_) {
               case sorte::debut:
                  plus_recent_seq = m.seq_id;
                  break;
               case sorte::texte:
                  if (m.seq_id == plus_recent_seq + 1) {
                     out << m.texte << '\n';
                     envoyer_vers(recep, "127.0.0.1"s, ZE_PORT_EMETTEUR, msg::make_ack(m.seq_id));
                     plus_recent_seq = m.seq_id;
                  } else { // il y a d'autres cas; ceci est naïf
                     envoyer_vers(recep, "127.0.0.1"s, ZE_PORT_EMETTEUR, msg::make_miss(plus_recent_seq + 1));
                  }
                  break;
               case sorte::fin:
                  poursuivre = false;
               }
         } while (poursuivre);
      } catch (erreur &err) {
         cerr << "Erreur " << err.what() << ", code " << err.code << endl;
      }
      fermer(std::move(recep));
   }};

Ne reste plus enfin qu'à fermer les livres

   emission.join();
   reception.join();
   nettoyeur_emission.join();
   ifstream in{ "sortie.txt" };
   copy(istreambuf_iterator<char>{ in }, istreambuf_iterator<char>{},
        ostream_iterator<char>{ cout });
}

Voilà.


Valid XHTML 1.0 Transitional

CSS Valide !