25 października 2021

RabbitMq cz.3 – rozdzielenie zadań

No i powstał trzeci wpis na temat RabbitMq, dziwne ;p

Jak wiadomo RabbitMq nie służy jedynie do wysyłania informacji innej aplikacji, tak jak wspomniałem tutaj, możemy bowiem stworzyć kolejkę zadań i rozdzielać je konsumentom.

Wyobraźmy sobie życiowy przykład, wiele osób posiada konto w portalu facebook.com, istnieje tam opcja ściągnięcia swoich danych w postaci spakowanego archiwum. Generowanie takiego archiwum trochę trwa, więc portal nie każe nam czekać, request jest wysłany w mgnieniu oka, natomiast zwrotka jaką otrzymujemy jest mniej więcej taka:
„Gdy raport zostanie przygotowany, poinformujemy Ciebie wysyłając odpowiednią wiadomość e-mail”.

Co dzieje się w tej chwili?

Żądanie wygenerowania naszego archiwum trawiło właśnie do kolejki. Przed nami pewnie miliony osób również zażyczyło sobie takowe archiwum czekając aż się wygeneruje. Jak zapewne możemy się domyślać, bardzo długo musieli byśmy czekać na swoje żądanie, na szczęście Facebook jak również inne aplikacje, mają wielu konsumentów swoich kolejek, to jak kilka kas w Tesco 😉

Zatem, zaczynajmy…

Wysyłka wiadomości

<?php

require_once('vendor/autoload.php');

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('rabbitmq', 5672, 'php', 'php');
$channel = $connection->channel();

$channel->queue_declare('phpmessage', false, false, false, false);

unset($argv[0]);

$msg = new AMQPMessage(implode(" ", $argv), ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'phpmessage');
$channel->basic_qos(null, 1, null);

$channel->close();
$connection->close();

Jak mogłeś zauważyć, wysyłka również uległa małej zmianie, skorzystaliśmy z funkcji „basic_qos” oraz ustawiliśmy parametr „delivery_mode„, wymienione elementy poprzednio pominęliśmy, dlaczego?

Najprościej odpowiedzieć, bo nie potrzebowaliśmy ich, pytanie zatem powinno brzmieć, do czego są nam potrzebne teraz?

Flaga „delivery_mode” ustawiona na wartość „2” oznacza, że kolejka nie zostanie utracona, nawet w sytuacji restartu RabbitMq. Wartość „2” możemy podać ręcznie, lub korzystają z przygotowanej do tego stałej wewnątrz klasy „AMQPMessage„.

Kolejną różnicą jest wspomniana wczesniej funkcja „basic_qos„, najważniejszym elementem, jest przekazany parametr drugi („prefetch_count„), dzięki niemu RabbitMq będzie wysyłał jedno zadanie na raz dla konsumenta. Przydzielać kolejne zadania będzie konsumentom którzy przetworzyli poprzednie.

Konsument

Pozostaje nam teraz zakodować odbiór wiadomości.

Pamiętaj, konsumenta uruchomimy wielokrotnie, dla testu odpalimy dwukrotnie, jednak nic nie stoi na przeszkodzie aby tą liczbę zwiększyć. Produkcyjnie konsumenci odpalani lub wyłączani są automatycznie w zależności od zapotrzebowania.

Kod źródłowy konsumenta prezentuje się następująco:

<?php

require_once('vendor/autoload.php');

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('rabbitmq', 5672, 'php', 'php');
$channel = $connection->channel();

$echoMessage = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(rand(4, 20));

    $msg->ack();
};

$channel->queue_declare('phpmessage', false, false, false, false);

$channel->basic_consume('phpmessage', '', false, false, false, false, $echoMessage);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

Aby zasymulować czas potrzebny na przetworzenie danych przez konsumenta, powyższy przykład wykorzystuje funkcję „sleep” z zmiennym parametrem, gdyż w produkcyjnych warunkach, nigdy nie wiemy ile nasz program będzie potrzebował czasu na wykonanie czynności.

Ponadto, dzięki funkcji „sleep” możemy dokładnie zaobserwować jak RabbitMq rozdziela zadania.

Sposób działania można zobaczyć na poniższym filmie: