RabbitMQ et Symfony Messenger par Frédéric Bouchery, Api Platform Conférence 2024

Ahmed Eben Hassine
Ahmed Eben hassine
25 septembre 2024

Codéin, expert en développement Symfony, intègre API Platform dans ses projets. Sponsor de l'API Platform Conférence 2024 mais également participants, nous avons eu l'opportunité de suivre la plupart des conférences.

Cet événement, principalement destiné aux experts techniques, a brillé par la richesse de son contenu. Afin de vous faire revivre les moments forts des conférences, nous avons préparé une série d'articles pour vous offrir un condensé des points clés qui ont particulièrement attiré notre attention.

Dans ce second article, nous vous invitons à plonger au sein de la conférence de Frédéric Bouchery.

 

Adopter un lapin

Frédéric BOUCHERY @FredBouchery

Frédéric Bouchery est expert PHP et lead chez CCM Benchmark

 

Frédéric Bouchery speaker Api Platform conférence 2024

 

Introduction à RabbitMQ : Un Outil Essentiel pour l'Orchestration Asynchrone

RabbitMQ

RabbitMQ est reconnu pour son efficacité dans l’orchestration des échanges des messages entre différentes applications, notamment dans le cadre de processus asynchrones. En plus de permettre la création de files d’attente et la répartition des tâches entre plusieurs travailleurs, ce système garantit une exécution fluide et une optimisation des ressources.


Dans cet article nous explorons ensemble les concepts de base de RabbitMQ et leur intégration avec Symfony Messenger et API platform.

Vocabulaire de base :

Pour bien comprendre le fonctionnement de RabbitMQ, il est indispensable de maîtriser quelques termes clés, que Frédéric a brillamment expliqués :

  1. Publisher : Il s'agit du composant qui envoie des messages vers un échange RabbitMQ, plutôt que directement dans une file d'attente. C'est l'élément qui déclenche la publication des messages.
  2. Exchange : Agit comme un point de réception pour les messages avant qu'ils ne soient dirigés vers les queues. Les exchanges sont de différents types (direct, topic, fanout, headers), et ils déterminent le routage des messages en fonction de ces types.
  3. Bind : Après réception des messages par l'exchange, une queue est associée à l'exchange à l'aide d'une clé de binding. Cela détermine les queues destinataires des messages.
  4. Binding Key : Clé utilisée pour diriger les messages de l'exchange vers les queues.
  5. Consumer : C'est le composant qui reçoit et traite les messages provenant d'une file d'attente.

Frédéric a particulièrement insisté sur le fait que l'exchange ne stocke pas les messages, mais joue un rôle clé dans leur direction vers les bonnes files d'attente selon des règles bien définies.

 

Les Queues : Comprendre leur Rôle et leur Configuration

Dans RabbitMQ, une queue agit comme une file d'attente, stockant temporairement les messages avant qu'ils ne soient consommés. Ces messages, envoyés via un exchange, sont ensuite récupérés et traités par les consommateurs.

Frédéric a pris le temps de détailler certaines caractéristiques clés des queues, notamment :

  • Durable : Une queue durable est stockée sur disque et survit au redémarrage du broker RabbitMQ.
  • Auto-delete : Une queue auto-delete est supprimée automatiquement lorsque toutes les connexions de consommateurs associées à cette queue sont fermées.

Dans Symfony Messenger, il est possible de désactiver ces comportements par défaut de la manière suivante:

framework:
    messenger:
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queues:
                        default:
                            durable: true        # Persiste après redémarrage
                            auto_delete: true    # Supprimée quand il n’y a plus de consommateurs

Il est important de noter que les queues sont durables par défaut, ce qui signifie qu'elles survivent à un redémarrage du serveur RabbitMQ et les consommateurs peuvent reprendre le traitement.

Gestion des retries (Dead Letter) :

Parfois, il arrive qu'un consommateur ne parvienne pas à traiter un message. Essayer à nouveau peut permettre d'éviter l'accumulation de messages non traités et aider à maintenir le système performant.

Différents statuts d'un message :

  • ACK : Le message est acquitté et supprimé de la queue.
  • NACK ~ Reject : Le message est supprimé.
  • TTL expiré : Si le temps de vie d'un message est dépassé, il est supprimé.
  • Limite de longueur dépassée : Si la queue est trop pleine, le message est soit supprimé, soit envoyé dans une "dead-letter queue".
  • Autre ni ACK ni NACK : Le message est renvoyé dans sa queue indéfiniment.

Pour gérer ces messages échoués, plusieurs options s'offrent à nous :   

Stratégies pour gérer les retries :

  • Pas d’ACK ni de NACK : Ne rien faire, dans ce cas le message reste bloqué dans la queue, risquant de créer une boucle infinie.
  • Envoyer un ACK : Envoyer un accusé de réception, marquant ainsi la fin du traitement. Le message est traité avec succès et supprimé.
  • Envoyer un NACK avec DLX (Dead Letter Exchange) : Cela permet de rejeter le message et de le retourner à un autre exchange, est lié à une autre queue, et un TTL est défini. En fonction du TTL configuré, le message peut être remis en queue ou supprimé. Par exemple, cela permet de gérer les interruptions de service en offrant un délai pour résoudre les problèmes de connexion.

Symfony Messenger : Gère les retries de manière abstraite et offre une flexibilité applicative avec des stratégies comme les délais croissants ou un nombre limité de tentatives. C'est idéal pour un contrôle fin des retries via l'application elle-même.

RabbitMQ seul : Nécessite une configuration spécifique avec les Dead Letter Exchanges (DLX), offrant un contrôle indépendant de l'application. C'est une solution adaptée pour gérer des retries directement via l'infrastructure.

 

Guide d’utilisation de RabbitMQ avec Symfony Messenger et API Platform

Pour tirer parti de RabbitMQ avec Symfony Messenger et Api Platform, voici les étapes à suivre :

  • Installer les dépendances suivantes :
composer require symfony/messenger symfony/amqp-messenger
  • Activer l'attribute messenger à true
#[ApiResource(operations: [ new Post(messenger: true, output: false, status: 202) ])] 
#[Entity] 
final class Greeting
  • Configurer le routage du message en mappant l'entité App\Entity\Greeting à un transport nommé amqp_messages . Ensuite, configurer ce transport en utilisant la variable d'environnement contenant le DSN de RabbitMQ.

Par défaut, Symfony Messenger utilise un Fanout Exchange lorsqu'il est configuré pour fonctionner avec RabbitMQ. Cela signifie que tous les messages envoyés via l'exchange sont automatiquement distribués à toutes les queues liées, sans se soucier de la routing key. Voici un exemple de configuration :

framework:
    messenger:
      transports:
        amqp_messages:
          dsn: 'amqp://guest:guest@localhost:5672:%2f/messages'
          routing:
            'App\Message\Greeting': amqp_messages
  • La file de messages sera créée automatiquement, et le message Greeting sera routé vers le transport amqp_messages , qui à son tour publiera dans l'exchange "messages " créé automatiquement également.
  • Par défaut, tous les messages seront envoyés via le même transport, ce qui peut entraîner des délais si un message est plus long à traiter qu'un autre. La solution est de déclarer un nouveau transport avec une autre file. Plus de détails : https://symfony.com/doc/current/messenger.html#prioritized-transports.
  • Lorsque qu’une requête POST est envoyée, le système renvoie une réponse vide avec un statut 202, signifiant que la requête a été reçue mais que son traitement aura lieu ultérieurement. Pour ajuster ce comportement, il faut décorer le processeur de persistance (persist processor) et surcharger le service api_platform.doctrine.orm.state.persist_processor. Cette modification permet de persister l'entité et de déclencher l'envoi du message immédiatement après la persistance de l'entité.
<?php declare(strict_types=1);

namespace App\Decorator;

use ApiPlatform\Metadata\Operation;
use ApiPlatform\State\ProcessorInterface;
use App\Message\NotifyMessage;
use Symfony\Component\DependencyInjection\Attribute\AsDecorator;
use Symfony\Component\Messenger\MessageBusInterface;

#[AsDecorator('api_platform.doctrine.orm.state.persist_processor')]
readonly final class PersistProcessor implements ProcessorInterface
{
    public function __construct(
        private ProcessorInterface $decorated,
        private MessageBusInterface $bus
    ) {}

    public function process(mixed $data, Operation $operation, array $context = []): mixed
    {
        $entity = $this->decorated->process($data, $operation, $context);
        $this->bus->dispatch(new NotifyMessage($entity->name));
        return $entity;
    }
}
  • Lors de l'envoi et de la réception de messages via un transport, ceux-ci sont sérialisés à l'aide des fonctions natives serialize() et unserialize() de PHP. Pour une meilleure flexibilité, il est possible de modifier ce comportement globalement ou pour chaque transport en utilisant un service implémentant l'interface SerializerInterface, puis en le configurant comme suit :
framework:
    messenger:
        transports:
            amqp_messages:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                serializer: 'App\Serializer\MessageSerializer
  • Si le traitement d'un message échoue, il sera d'abord publié à un exchange delay , puis vers la queue temporaire delay_queue_1000. Ensuite, il sera renvoyé vers la queue principale pour un nouveau traitement. Si le message échoue à nouveau, il passera par le même processus, mais avec un DLX de deux secondes. Heureusement, tout cela est abstrait pour nous. Il suffit de mettre en place la configuration proposé par messenger https://symfony.com/doc/current/messenger.html#retries-failures

 

Questions / Réponses

Lors du déploiement d'une nouvelle version, il est nécessaire de redémarrer les workers pour qu'ils prennent en compte le nouveau code. Cela peut se faire en exécutant la commande messenger:stop-workers, qui envoie un signal SIGTERM pour indiquer aux workers de terminer leurs tâches en cours avant de s'arrêter.

Dans des environnements distribués, un exchange spécifique peut également diffuser un message de shutdown pour coordonner l'arrêt des consommateurs sur plusieurs machines ou conteneurs sans avoir à les lister manuellement.

 

Conclusion

La conférence de FrédéricBouchery nous a donné une belle compréhension de RabbitMQ, tant comme outil autonome que dans son intégration avec Symfony Messenger et Api platfrom. En abordant les différents types d'éxchanges, et les mécanismes de gestion des retries, Frédéric a présenté des solutions robustes pour optimiser la gestion des messages. Cette flexibilité permet de concevoir des systèmes plus complexes et adaptés aux besoins spécifiques des environnements distribués.

Restez connectés pour découvrir nos prochains résumés sur les autres conférences de l'événement !

À la recherche d'une nouvelle opportunité ? Voir toutes les offres
Découvrez quelques uns de nos projets Symfony Voir les projets

A lire aussi

Voir tous les articles