Я пытаюсь создать очередь сообщений с помощью Redis. Всякий раз, когда клиент отправляет новые данные, они добавляются в список.
Вот код для него
$client->lPush("my_queue", $data);
Теперь есть отдельный скрипт slave.php, который выдает новые данные и обрабатывает их. Код для slave.php
while (true) { list($queue, $message) = $client->brPop(["my_queue"], 0); /* Logic to process the data */ }
Я изменил сценарий запуска apache, чтобы slave.php начинал и останавливался с помощью apache. Это работает хорошо. Но после ожидания в течение нескольких минут команда brPop перестает слушать сообщение об ошибке следующим образом:
Uncaught exception 'Predis\Connection\ConnectionException' with message 'Error while reading line from the server [tcp://127.0.0.1:6379]' in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php:139 Stack trace: #0 /var/www/api/lib/predis-0.8/lib/Predis/Connection/StreamConnection.php(205): Predis\Connection\AbstractConnection->onConnectionError('Error while rea...') #1 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(128): Predis\Connection\StreamConnection->read() #2 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(120): Predis\Connection\AbstractConnection->readResponse(Object(Predis\Command\ListPopLastBlocking)) #3 /var/www/api/lib/predis-0.8/lib/Predis/Client.php(227): Predis\Connection\AbstractConnection->executeCommand(Object(Predis\Command\ListPopLastBlocking)) #4 /var/www/api/lib/slave.php(7): Predis\Client->__call('brPop', Array) #5 /var/www/api/lib/slave.php(7): Predis\Client->brPop(Array, 0) #6 {main} thrown in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php on line 139
Согласно документации, если список пуст, BLPOP / BRPOP блокирует соединение, пока другой клиент не выполнит операцию LPUSH или RPUSH с одним из ключей. Но этого не происходит в моем случае. В моем случае, когда брпоп блокирует соединение, он не прослушивается снова, даже когда новые данные поступают в список.
Какие изменения я должен внести, чтобы заставить это работать?
Теперь он работает для меня, но я не уверен, что это правильный способ сделать это. Теперь я улавливаю ошибку и рекурсивно вызываю функцию в случае сбоя соединения. Мой новый slave.php выглядит так:
function process_data() { try { $client = new \Predis\Client(); require_once("logger.php"); while (true) { list($queue, $message) = $client->brPop(["bookmark_queue"], 0); // logic } } catch (Exception $ex) { $error = $ex->getMessage(); log_error($error, "slave.php"); process_data(); // call the function recursively if connection fails } } process_data(); // call the function
Добавьте ?read_write_timeout=-1
в строку подключения.