Как задержать? – php-amqplib

Я хотел бы знать, как задержать Amqpphplib.

Я использовал этот большой учебник по сценарию кофе:

https://github.com/jamescarr/rabbitmq-scheduled-delivery

но он не работает с PHP-amqplib.

Сообщение истекает, как я хочу, но кажется, что «x-dead-letter-exchange» не выполняет эту работу. Я использовал консоль управления RabbitMQ, и я вижу все создание и удаление очереди в режиме реального времени. Но мое сообщение после окончания срока действия переходит в ближайшую очередь. Я использую версию RabbitMQ 3.2.3, версию PHP-amqplib 2.2. *.

Вот мой код:

Класс подключения:

class Connection { /** * @var $ch */ public $ch; /** * @var $consumer_tag */ public $consumer_tag; /** * @var $exchange */ public $exchange; /** * @var $conn */ public $conn; public function __construct($host, $port, $user, $password, $vhost) { $this->exchange = 'immediate'; $this->queue = 'right.now.queue'; $this->consumer_tag = 'consumer'; $this->conn = new AMQPConnection($host, $port, $user, $password, $vhost); $this->ch = $this->conn->channel(); $this->ch->exchange_declare($this->exchange, 'direct', false, true, false); $this->ch->queue_declare($this->queue, false, true, false, false, false); $this->ch->queue_bind($this->queue, $this->exchange); } public function createDelayedQueue ($name, $delay_seconds) { $this->ch->queue_declare($name, false, false, false, true, true, array( "x-dead-letter-exchange" => array("S", $this->exchange), "x-message-ttl" => array("I", $delay_seconds*1000), "x-expires" => array("I", $delay_seconds*1000+1000) )); } } 

Опубликовать код

 $name = 'send.later.'.$ts; $amqp->createDelayedQueue($name, 2); $msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain', 'delivery_mode' => 2)); $amqp->ch->basic_publish($msg); 

Код потребителя

 $amqp = $this->getContainer()->get('amqp_connexion'); $amqp->ch->basic_consume($amqp->queue, $amqp->consumer_tag, false, false, false, false, function ($msg) { echo $msg->body; echo "\n--------\n"; }); $output->writeln('Listening '.$amqp->queue.'...'); // Loop as long as the channel has callbacks registered while (count($amqp->ch->callbacks)) { $amqp->ch->wait(); } 

Я просто написал упрощенную рабочую версию для php:

 /////// simplified /////// // include the AMQPlib Classes || use an autoloader // queue/exchange names $queueRightNow = 'right.now.queue'; $exchangeRightNow = 'right.now.exchange'; $queueDelayed5sec = 'delayed.five.seconds.queue'; $exchangeDelayed5sec = 'delayed.five.seconds.exchange'; $delay = 5; // delay in seconds // create connection $AMQPConnection = new \PhpAmqpLib\Connection\AMQPConnection('localhost',5672,'guest','guest'); // create a channel $channel = $AMQPConnection->channel(); // create the right.now.queue, the exchange for that queue and bind them together $channel->queue_declare($queueRightNow); $channel->exchange_declare($exchangeRightNow, 'direct'); $channel->queue_bind($queueRightNow, $exchangeRightNow); // now create the delayed queue and the exchange $channel->queue_declare( $queueDelayed5sec, false, false, false, true, true, array( 'x-message-ttl' => array('I', $delay*1000), // delay in seconds to milliseconds "x-expires" => array("I", $delay*1000+1000), 'x-dead-letter-exchange' => array('S', $exchangeRightNow) // after message expiration in delay queue, move message to the right.now.queue ) ); $channel->exchange_declare($exchangeDelayed5sec, 'direct'); $channel->queue_bind($queueDelayed5sec, $exchangeDelayed5sec); // now create a message und publish it to the delayed exchange $msg = new \PhpAmqpLib\Message\AMQPMessage( time(), array( 'delivery_mode' => 2 ) ); $channel->basic_publish($msg,$exchangeDelayed5sec); // consume the delayed message $consumeCallback = function(\PhpAmqpLib\Message\AMQPMessage $msg) { $messagePublishedAt = $msg->body; echo 'seconds between publishing and consuming: ' . (time()-$messagePublishedAt) . PHP_EOL; }; $channel->basic_consume($queueRightNow, '', false, true, false, false, $consumeCallback); // start consuming while (count($channel->callbacks) > 0) { $channel->wait(); } 

Если вы выбираете транспорт на основе interq на основе amqp, вам вообще не нужно вникать в детали. Только несколько вещей, чтобы сделать:

Установите enqueue/amqp-lib (кстати, вы можете использовать другие транспорты на основе amqp ext и большой bunny lib) для транспорта и enqueue/amqp-tools .

 composer require enqueue/amqp-lib enqueue/amqp-tools 

Создайте контекст amqp, добавьте стратегию задержки и отправьте сообщения с задержкой:

 <?php use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; use Enqueue\AmqpBunny\AmqpConnectionFactory; $context = (new AmqpConnectionFactory('amqp://'))->createContext(); $context->setDelayStrategy(new RabbitMqDlxDelayStrategy()) $queue = $context->createQueue('foo'); $context->declareQueue($queue); $message = $context->createMessage('Hello world!'); $context->createProducer() ->setDeliveryDelay(5000) // 5 sec ->send($queue, $message) ; 

Кстати, это не эта единственная стратегия. есть один, основанный на плагине задержки RabbitMQ. Его можно использовать одинаково.