Я создал простого издателя и потребителя, который подписывается в очереди, используя basic.consume
.
Мой потребитель подтверждает сообщения, когда работа выполняется без исключения. Всякий раз, когда я сталкиваюсь с исключением, я не понимаю сообщение и возвращаюсь раньше. Из подтвержденных сообщений исчезают только сообщения с подтвержденными сообщениями, поэтому они работают правильно.
Теперь я хочу, чтобы потребитель снова собирал неудавшиеся сообщения, но единственный способ пересмотреть эти сообщения – это перезапустить пользователя.
Как мне подойти к этому варианту использования?
Код установки
$channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName('my-exchange'); $exchange->setType('fanout'); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->declare(); $queue->bind('my-exchange');
Код потребителя
$queue->consume(array($this, 'callback')); public function callback(AMQPEnvelope $msg) { try { //Do some business logic } catch (Exception $ex) { //Log exception return; } return $queue->ack($msg->getDeliveryTag()); }
Код производителя
$exchange->publish('message');
Если сообщение не было подтверждено и приложение не работает, оно будет автоматически redelivered
повторно, а свойство redelivered
на конверте будет установлено в true
(если вы не будете использовать их no-ack = true
).
UPD:
Вы должны nack
сообщение с nack
в блоке catch
try { //Do some business logic } catch (Exception $ex) { //Log exception return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); }
Остерегайтесь бесконечно nacked сообщений, в то время как счет повторной доставки не реализован в RabbitMQ и в AMQP-протоколе.
Если вы не хотите общаться с такими сообщениями и просто хотите добавить некоторую задержку, вы можете захотеть добавить sleep()
или usleep()
перед nack
метода nack
, но это совсем не хорошая идея.
Существует множество методов решения проблемы повторного набора циклов:
1. Опираясь на Мертвые обмены письмами
2. Используйте для каждого сообщения или в очереди TTL
Примеры (обратите внимание, что для очереди ttl мы передаем только номер и для сообщения ttl – все, что будет числовой строкой):
2.1 За сообщение ttl:
$queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->declareQueue(); $queue->bind('my-exchange'); $exchange->publish( 'message at ' . microtime(true), null, AMQP_NOPARAM, array( 'expiration' => '1000' ) );
2.2. В очереди ttl:
$queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->setArgument('x-message-ttl', 1000); $queue->declareQueue(); $queue->bind('my-exchange'); $exchange->publish('message at ' . microtime(true));
3. Удерживайте количество повторных наборов или количество оставшихся повторных наборов (ака-ограничение или ttl в стеке IP) в теле сообщения или заголовках
Код:
$queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->declareQueue(); $queue->bind('my-exchange'); $exchange->publish( 'message at ' . microtime(true), null, AMQP_NOPARAM, array( 'headers' => array( 'ttl' => 100 ) ) ); $queue->consume( function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) { $headers = $msg->getHeaders(); echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' '; echo $msg->getDeliveryTag(), ' '; echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' '; echo $msg->getBody(), PHP_EOL; try { //Do some business logic throw new Exception('business logic failed'); } catch (Exception $ex) { //Log exception if (isset($headers['ttl'])) { // with ttl logic if ($headers['ttl'] > 0) { $headers['ttl']--; $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers)); } return $queue->ack($msg->getDeliveryTag()); } else { // without ttl logic return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue } } return $queue->ack($msg->getDeliveryTag()); } );
Могут быть некоторые другие способы улучшить управление потоками сообщений.
Вывод : нет серебряного пулевого раствора. Вы должны решить, какое решение соответствует вашим потребностям наилучшим образом или узнать что-то другое, но не забудьте поделиться им здесь;)
Если вы не хотите перезапускать пользователя, то basic.recover
AMQP может быть тем, что вы хотите. Согласно протоколу AMQP :
basic.recover(bit requeue) Redeliver unacknowledged messages. This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.