Intereting Posts
Форма Html / Php, не добавляющая в базу данных SQL Как я мог избежать использования MySQL-запроса в цикле While в PHP Предел верхней памяти для PHP / Apache Разрешить проверку только в том случае, если продукт обязательной категории находится в корзине Загрузите изображение с помощью JavaScript с другого сервера через AJAX Как интегрировать сервер Asterisk с внешней реляционной базой данных, например mysql? Magento 1.7 Не удается отправить заголовки; заголовки, уже отправленные как отправить содержимое узла в xml на другую страницу с помощью php? PHP newline не работает в текстовом файле Почему не удается совместить код? Ответ PHP и Android PHPExcel – клон-лист и сохранить его оригинальный стиль повторение всплывающего окна jquery в php Должен ли я обрезать расшифрованную строку после mcrypt_decrypt? CakePHP: Неустранимая ошибка: допустимый размер памяти 536870912 байт исчерпан (пытался выделить 52 байта) Как загрузить загруженные файлы с помощью php

Потребление не подтверждается сообщениями от RabbitMq

Я создал простого издателя и потребителя, который подписывается в очереди, используя basic.consume .

Мой потребитель подтверждает сообщения, когда работа выполняется без исключения. Всякий раз, когда я сталкиваюсь с исключением, я не понимаю сообщение и возвращаюсь раньше. Из подтвержденных сообщений исчезают только сообщения с подтвержденными сообщениями, поэтому они работают правильно.
Теперь я хочу, чтобы потребитель снова собирал неудавшиеся сообщения, но единственный способ пересмотреть эти сообщения – это перезапустить пользователя.

Как мне подойти к этому варианту использования?

Код установки

 $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName('my-exchange'); $exchange->setType('fanout'); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->declare(); $queue->bind('my-exchange'); 

Код потребителя

 $queue->consume(array($this, 'callback')); public function callback(AMQPEnvelope $msg) { try { //Do some business logic } catch (Exception $ex) { //Log exception return; } return $queue->ack($msg->getDeliveryTag()); } 

Код производителя

 $exchange->publish('message'); 

Solutions Collecting From Web of "Потребление не подтверждается сообщениями от RabbitMq"

Если сообщение не было подтверждено и приложение не работает, оно будет автоматически redelivered повторно, а свойство redelivered на конверте будет установлено в true (если вы не будете использовать их no-ack = true ).

UPD:

Вы должны nack сообщение с nack в блоке catch

  try { //Do some business logic } catch (Exception $ex) { //Log exception return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); } 

Остерегайтесь бесконечно nacked сообщений, в то время как счет повторной доставки не реализован в RabbitMQ и в AMQP-протоколе.

Если вы не хотите общаться с такими сообщениями и просто хотите добавить некоторую задержку, вы можете захотеть добавить sleep() или usleep() перед nack метода nack , но это совсем не хорошая идея.

Существует множество методов решения проблемы повторного набора циклов:

1. Опираясь на Мертвые обмены письмами

  • профи: надежный, стандартный, понятный
  • минусы: требуется дополнительная логика

2. Используйте для каждого сообщения или в очереди TTL

  • профи: легко реализовать, также стандартный, понятный
  • минусы: с длинными очередями вы можете потерять некоторое сообщение

Примеры (обратите внимание, что для очереди ttl мы передаем только номер и для сообщения ttl – все, что будет числовой строкой):

2.1 За сообщение ttl:

 $queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->declareQueue(); $queue->bind('my-exchange'); $exchange->publish( 'message at ' . microtime(true), null, AMQP_NOPARAM, array( 'expiration' => '1000' ) ); 

2.2. В очереди ttl:

 $queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->setArgument('x-message-ttl', 1000); $queue->declareQueue(); $queue->bind('my-exchange'); $exchange->publish('message at ' . microtime(true)); 

3. Удерживайте количество повторных наборов или количество оставшихся повторных наборов (ака-ограничение или ttl в стеке IP) в теле сообщения или заголовках

  • профи: дает вам дополнительный контроль над временем жизни сообщений на уровне приложений
  • минусы: значительные накладные расходы, в то время как вам нужно изменить сообщение и опубликовать его снова, конкретное приложение, непонятное

Код:

 $queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->declareQueue(); $queue->bind('my-exchange'); $exchange->publish( 'message at ' . microtime(true), null, AMQP_NOPARAM, array( 'headers' => array( 'ttl' => 100 ) ) ); $queue->consume( function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) { $headers = $msg->getHeaders(); echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' '; echo $msg->getDeliveryTag(), ' '; echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' '; echo $msg->getBody(), PHP_EOL; try { //Do some business logic throw new Exception('business logic failed'); } catch (Exception $ex) { //Log exception if (isset($headers['ttl'])) { // with ttl logic if ($headers['ttl'] > 0) { $headers['ttl']--; $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers)); } return $queue->ack($msg->getDeliveryTag()); } else { // without ttl logic return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue } } return $queue->ack($msg->getDeliveryTag()); } ); 

Могут быть некоторые другие способы улучшить управление потоками сообщений.

Вывод : нет серебряного пулевого раствора. Вы должны решить, какое решение соответствует вашим потребностям наилучшим образом или узнать что-то другое, но не забудьте поделиться им здесь;)

Если вы не хотите перезапускать пользователя, то basic.recover AMQP может быть тем, что вы хотите. Согласно протоколу AMQP :

 basic.recover(bit requeue) Redeliver unacknowledged messages. This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.