RabbitMQ ждет завершения нескольких очередей

Хорошо, вот обзор того, что происходит:

M <-- Message with unique id of 1234 | +-Start Queue | | | <-- Exchange /|\ / | \ / | \ <-- bind to multiple queues Q1 Q2 Q3 \ | / <-- start of the problem is here \ | / \ | / \|/ | Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start | C <-- Consumer 

Поэтому у меня есть обмен, который подталкивает к нескольким очередям, каждая очередь имеет задачу, как только все задачи будут завершены, только тогда можно запустить Queue 4.

Таким образом, сообщение с уникальным идентификатором 1234 отправляется на обмен, обмен направляет его ко всем очередям задач (Q1, Q2, Q3 и т. Д.), Когда все задачи для идентификатора сообщения 1234 завершены, запустите Q4 для сообщения id 1234.

Как я могу это реализовать?

Использование Symfony2, RabbitMQBundle и RabbitMQ 3.x

Ресурсы:

  • http://www.rabbitmq.com/tutorials/amqp-concepts.html
  • http://www.rabbitmq.com/tutorials/tutorial-six-python.html

ОБНОВЛЕНИЕ # 1

Хорошо, я думаю, что это то, что я ищу:

  • https://github.com/videlalvaro/Thumper/tree/master/examples/parallel_processing

RPC с параллельной обработкой, но как установить идентификатор корреляции в качестве моего уникального идентификатора для группировки сообщений, а также определить, какая очередь?

Solutions Collecting From Web of "RabbitMQ ждет завершения нескольких очередей"

В учебнике RPC на сайте RabbitMQ есть способ передать «Идентификатор корреляции», который может идентифицировать ваши сообщения для пользователей в очереди.

Я бы рекомендовал использовать какой-то идентификатор с вашими сообщениями в первые 3 очереди, а затем еще один процесс для удаления сообщений из 3 в какие-то ведра. Когда эти ведра получают то, что я предполагаю, это завершение трех задач, отправьте окончательное сообщение в четвертую очередь для обработки.

Если вы отправляете более одного рабочего элемента в каждую очередь для одного пользователя, вам может потребоваться небольшая предварительная обработка, чтобы узнать, сколько элементов, которые пользователь помещает в очередь, так что процесс, дезактивирующий до того, как 4 знает, сколько нужно ожидать перед очередью вверх.


Я делаю свой rabbitmq на C #, так что извините, что мой псевдокод не в стиле php

 // Client byte[] body = new byte[size]; body[0] = uniqueUserId; body[1] = howManyWorkItems; body[2] = command; // Setup your body here Queue(body) 

 // Server // Process queue 1, 2, 3 Dequeue(message) switch(message.body[2]) { // process however you see fit } processedMessages[message.body[0]]++; if(processedMessages[message.body[0]] == message.body[1]) { // Send to queue 4 Queue(newMessage) } 

Ответ на обновление №1

Вместо того, чтобы думать о вашем клиенте как о терминале, может быть полезно подумать о клиенте как о процессе на сервере. Поэтому, если вы настроите RPC-клиент на сервере, подобном этому , тогда все, что вам нужно сделать, это заставить обработчик сервера генерировать уникальный идентификатор пользователя и отправлять сообщения в соответствующие очереди:

  public function call($uniqueUserId, $workItem) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( serialize(array($uniqueUserId, $workItem)), array('correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue) ); $this->channel->basic_publish($msg, '', 'rpc_queue'); while(!$this->response) { $this->channel->wait(); } // We assume that in the response we will get our id back return deserialize($this->response); } $rpc = new Rpc(); // Get unique user information and work items here // Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need. $response = rpc->call($uniqueUserId, $workItem); $responseBuckets[array[0]]++; // Just like above code that sees if a bucket is full or not 

Вам необходимо реализовать это: http://www.eaipatterns.com/Aggregator.html, но RabbitMQBundle для Symfony не поддерживает это, поэтому вам придется использовать базовый php-amqplib.

Обычный обратный вызов потребителя из пакета получит AMQPMessage. Оттуда вы можете получить доступ к каналу и вручную опубликовать все последующие обмены в вашей реализации «трубы и фильтры»

Я немного не понимаю, чего вы пытаетесь достичь здесь. Но я, вероятно, несколько изменил бы дизайн, чтобы после того, как все сообщения будут удалены из очередей, которые вы публикуете, на отдельный обмен, который публикуется в очереди 4.

В дополнение к моему ответу на RPC я хочу добавить еще один, основанный на шаблоне агрегатора EIP .

Идея следующая: все асинхронно, не RPC или другие вещи синхронизации. Каждая задача отправляет даже когда это делается, агрегатор подписывается на это событие. Он в основном учитывает задачи и отправляет сообщение task4, когда счетчик достигает ожидаемого числа (в нашем случае 3). Я выбираю файловую систему как хранилище для счетчиков для Простоты. Вы можете использовать базу данных там.

Производитель выглядит проще. Он просто стреляет и забывает

 <?php use Enqueue\Client\Message; use Enqueue\Client\ProducerInterface; use Enqueue\Util\UUID; use Symfony\Component\DependencyInjection\ContainerInterface; /** @var ContainerInterface $container */ /** @var ProducerInterface $producer */ $producer = $container->get('enqueue.client.producer'); $message = new Message('the task data'); $message->setCorrelationId(UUID::generate()); $producer->sendCommand('task1', clone $message); $producer->sendCommand('task2', clone $message); $producer->sendCommand('task3', clone $message); 

Обработчик задач должен отправить событие после выполнения задания:

 <?php use Enqueue\Client\CommandSubscriberInterface; use Enqueue\Client\Message; use Enqueue\Client\ProducerInterface; use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Psr\PsrProcessor; class Task1Processor implements PsrProcessor, CommandSubscriberInterface { private $producer; public function __construct(ProducerInterface $producer) { $this->producer = $producer; } public function process(PsrMessage $message, PsrContext $context) { // do the job // same for other $eventMessage = new Message('the event data'); $eventMessage->setCorrelationId($message->getCorrelationId()); $this->producer->sendEvent('task_is_done', $eventMessage); return self::ACK; } public static function getSubscribedCommand() { return 'task1'; } } 

А агрегаторный процессор:

 <?php use Enqueue\Client\TopicSubscriberInterface; use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Psr\PsrProcessor; use Symfony\Component\Filesystem\LockHandler; class AggregatorProcessor implements PsrProcessor, TopicSubscriberInterface { private $producer; private $rootDir; /** * @param ProducerInterface $producer * @param string $rootDir */ public function __construct(ProducerInterface $producer, $rootDir) { $this->producer = $producer; $this->rootDir = $rootDir; } public function process(PsrMessage $message, PsrContext $context) { $expectedNumberOfTasks = 3; if (false == $cId = $message->getCorrelationId()) { return self::REJECT; } try { $lockHandler = new LockHandler($cId, $this->rootDir.'/var/tasks'); $lockHandler->lock(true); $currentNumberOfProcessedTasks = 0; if (file_exists($this->rootDir.'/var/tasks/'.$cId)) { $currentNumberOfProcessedTasks = file_get_contents($this->rootDir.'/var/tasks/'.$cId); if ($currentNumberOfProcessedTasks +1 == $expectedNumberOfTasks) { unlink($this->rootDir.'/var/tasks/'.$cId); $this->producer->sendCommand('task4', 'the task data'); return self::ACK; } } file_put_contents($this->rootDir.'/var/tasks/'.$cId, ++$currentNumberOfProcessedTasks); return self::ACK; } finally { $lockHandler->release(); } } public static function getSubscribedTopics() { return 'task_is_done'; } } 

Я могу показать вам, как вы можете это сделать с помощью пакета enqueue-bundle .

Поэтому установите его вместе с композитором и зарегистрируйте как любой другой комплект. Затем настройте:

 // app/config/config.yml enqueue: transport: default: 'amnqp://' client: ~ 

Этот подход основан на RPC. Вот как вы это делаете:

 <?php use Enqueue\Client\ProducerInterface; use Symfony\Component\DependencyInjection\ContainerInterface; /** @var ContainerInterface $container */ /** @var ProducerInterface $producer */ $producer = $container->get('enqueue.client.producer'); $promises = new SplObjectStorage(); $promises->attach($producer->sendCommand('task1', 'the task data', true)); $promises->attach($producer->sendCommand('task2', 'the task data', true)); $promises->attach($producer->sendCommand('task3', 'the task data', true)); while (count($promises)) { foreach ($promises as $promise) { if ($replyMessage = $promise->receiveNoWait()) { // you may want to check the response here $promises->detach($promise); } } } $producer->sendCommand('task4', 'the task data'); 

Потребительский процессор выглядит так:

 use Enqueue\Client\CommandSubscriberInterface; use Enqueue\Consumption\Result; use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Psr\PsrProcessor; class Task1Processor implements PsrProcessor, CommandSubscriberInterface { public function process(PsrMessage $message, PsrContext $context) { // do task job return Result::reply($context->createMessage('the reply data')); } public static function getSubscribedCommand() { // you can simply return 'task1'; if you do not need a custom queue, and you are fine to use what enqueue chooses. return [ 'processorName' => 'task1', 'queueName' => 'Q1', 'queueNameHardcoded' => true, 'exclusive' => true, ]; } } 

Добавьте его в свой контейнер в качестве службы с тегом enqueue.client.processor и запустите команду bin/console enqueue:consume --setup-broker -vvv

Вот простая версия PHP .