Это правильный способ использования очереди сообщений?

Я новичок в очередях обмена сообщениями, и сейчас я использую ZeroMQ на своем Linux-сервере. Я использую PHP для написания как клиента, так и сервера. Это в основном используется для обработки push-уведомлений.

Как я уже ZMQContext , я использую базовый шаблон формальной связи REQ REP ZMQContext экземпляров ZMQContext ввода-вывода.

Вот минимальный код zeromqServer.php :

 include("someFile.php"); $context = new ZMQContext(1); // Socket to talk to clients $responder = new ZMQSocket($context, ZMQ::SOCKET_REP); $responder->bind("tcp://*:5555"); while (true) { $request = $responder->recv(); printf ("Received request: [%s]\n", $request); // ----------------------------------------------------------------- // Process push notifications here // sleep (1); // ----------------------------------------------------------------- // Send reply back to client $responder->send("Basic Reply"); } в include("someFile.php"); $context = new ZMQContext(1); // Socket to talk to clients $responder = new ZMQSocket($context, ZMQ::SOCKET_REP); $responder->bind("tcp://*:5555"); while (true) { $request = $responder->recv(); printf ("Received request: [%s]\n", $request); // ----------------------------------------------------------------- // Process push notifications here // sleep (1); // ----------------------------------------------------------------- // Send reply back to client $responder->send("Basic Reply"); } 

И вот минимизированный клиент ZeroMQ :

 $context = new ZMQContext(); // Socket to talk to server echo "Connecting to hello world server…\n"; $requester = new ZMQSocket($context, ZMQ::SOCKET_REQ); $check = $requester->connect("tcp://localhost:5555"); var_dump($check); $requester->send("json string payload with data required to process push notifications."); //$reply = $requester->recv(); 

Итак, что я делаю? Я запускаю zeromqServer.php в качестве фоновой службы, используя команду linux

nohup php zeromqServer.php &

Это работает как фоновый процесс. Теперь, когда клиент называет это, он выполняет требуемую работу.

Но проблема в том, что мне нужно перезапустить процесс каждый раз, когда есть изменения в любом из файлов (включая те, которые include в файл zeromqServer ).

И более того, как-то через 2-3 дня он просто перестает работать. Процесс не останавливается, но он просто перестает работать.

Я чувствую, что это должна быть проблема сокета, возможно, сокет больше не открыт. В то время мне пришлось перезапустить процесс zeromqServer.php .

Вопрос Q1: Что может быть проблемой?

Q2: И каков правильный способ сделать это?

Это неправильный способ использования очереди сообщений.

A1: Проблема заключается в том, что ваш сервер, наконец, должен заблокировать, так как клиентский код не получает ответа, в то время как REQ/REP -pattern требует этого. zeromqServer.php на стороне REP просто не будет пытаться recv() другое сообщение от связанного клиента (на стороне REQ от Formal Communication Patter), пока клиент не будет физически доставлен (во внутренний буфер) и имеет recv() – сообщение «ответ» с стороны zeromqServer.php .

Более ранние версии ZeroMQ, вер. 2.1 и др., Использовались для неограниченного, бесконечного размера ограничения по умолчанию для внутренних очередей сообщений узла и управления памятью, используемых для буферизации низкоуровневого ввода-вывода, до того, как данные были скопированы в O / S-ядро ресурсов и освобожден из памяти ZeroMQ.

Более новые версии ZeroMQ, ver 3.x +, имеют так называемые HWM- файлы (например, High-Water-Mark-s) по умолчанию «всего лишь 1000 сообщений», которые содержат «короткие», после чего запускается соответствующий фрагмент такого ресурса ZeroMQ для блокировки или удаления сообщений.

Хотя реактивная попытка явно увеличить управление HWM-настройками выглядит грязным способом решения основной ошибки дизайна, другое предупреждение ZeroMQ справедливо по этому поводу, чтобы еще более осторожно идти именно в этом направлении (ØMQ не гарантирует, что сокет будет принимать как многие – как сообщения ZMQ_SNDHWM , а фактический предел может быть на 60-70% ниже в зависимости от потока сообщений в сокете).

Забыть или не в состоянии сделать это ( Ref: как показывает код OP):

 //$reply = $requester->recv(); 

означает, что ваш REQ/REP Formal Communication Pattern «маятник» становится необратимо запертым (навсегда).


A2: Basic REQ/REP Formal-Communication-Pattern звучит прямо, но имеет несколько опасных функций, наблюдаемая блокировка – это лишь одна из них. Некоторые дополнительные шаги могут быть предприняты по коду, для развертывания XREQ/XREP , DEALER/ROUTER и других инструментов, но дизайн должен быть пересмотрен в XREQ/XREP , а не только SLOC SLOC , поскольку, похоже, есть много чего реализовать, перед созданием кода. Одна из основных ошибок заключается в том, что сообщение отправляется после send() как был send() метод send() . Не в ZeroMQ.

Кроме того, код-код должен предполагать неопределенный характер доставки сообщения и надлежащим образом обрабатывать как недостающую проблему с сообщением, так и любой инцидент блокировки распределенных сервисов (взаимоблокировка, переполнение буфера, переполнение буфера-порога, старые / новые API-интерфейсы, поскольку нет никакой явной гарантии, что любой из ваших одноранговых узлов обмена сообщениями (в ZeroMQ отсутствует центральный брокер сообщений) имеет ту же самую версию API / протокола ZeroMQ, реализованную на его стороне localhost, поэтому в действительности есть много новых точек зрения, во время дизайн кода)


Лучший способ сделать это

Если вы можете доверять и верить в практический опыт, вашим лучшим следующим шагом должно быть скачать и прочитать сказочную книгу Pieter HINTJENS «Код подключен, том 1» , где Pieter имеет много информации о распределенной обработке , включая множество подсказок и направлений для надежных шаблонов, которые вы хотите реализовать.

Прочитайте книгу, это тоже стоит вашего времени, и вы, вероятно, будете многократно пересматривать книгу, если вы останетесь в распределенном программном обеспечении, поэтому не стесняйтесь начинать прямо сейчас, перепрыгивая на такую ​​кулинарную книгу на 400+ страниц от Мастера мастеров , Pieter HINTJENS вне вопросов.

Зачем? Реальный мир, как правило, намного сложнее

Только одна картина, изображенная на рис. 60 из вышеупомянутой книги, чтобы забыть о повторном использовании отдельного архетипа и осознать необходимость надлежащей сквозной перспективы проектирования распределенной системы, включая стратегии предотвращения блокировки и взаимоблокировки :

Транспортная плоскость + плоскость SIG

Просто, чтобы иметь представление, посмотрите на следующий пример кода из простой распределенной передачи сообщений, где процесс aMiniRESPONDER использует несколько каналов aMiniRESPONDER .

введите описание изображения здесь


Как улучшить вашу реализацию в довольно большом веб-домене PHP-приложения?

Узнайте, как предотвратить конфликты (дизайн-мудрый) и дескриптор (deux-ex-machina type).

У PHP есть собственные правильные синтаксические конструкторы для этого типа алгоритмизации, но архитектура и дизайн в ваших руках, от начала до конца.

Чтобы понять, насколько велика проблема с конфликтами: { try:, except:, finally: } установки / инфраструктуры системы ZeroMQ / частичное завершение работы ZeroMQ, проверьте номера [SoW] только по номерам строк:

 14544 - 14800 // a safe infrastructure setup on aMiniRESPONDER side ~ 256 SLOCs 15294 - 15405 // a safe infrastructure graceful termination ~ 110 SLOCs 

по сравнению с основной логикой сегмента обработки aMiniRESPONDER примера aMiniRESPONDER

 14802 - 15293 // aMiniRESPONDER logic, incl. EXC-HANDLERs ~ 491 SLOCs 

Окончательная заметка о распределенных системах на основе ZeroMQ

Требуя? Да, но очень мощный, масштабируемый, быстрый и действительно полезный при правильном использовании. Не стесняйтесь вкладывать свое время и усилия, чтобы приобрести и управлять своими знаниями в этой области. Все ваши дальнейшие проекты программного обеспечения могут только выиграть от этих профессиональных инвестиций.

Я могу лишь частично ответить на этот вопрос. Я понятия не имею, почему процесс зависает через 2-3 дня.

В целом сценарий PHP загружается один раз во время выполнения скрипта, похоже, что это ограничение не существует. Однако ваш текущий код можно переписать следующим образом:

somefile.php

 $context = new ZMQContext(1); // Socket to talk to clients $responder = new ZMQSocket($context, ZMQ::SOCKET_REP); $responder->bind("tcp://*:5555"); while (true) { $request = $responder->recv(); printf ("Received request: [%s]\n", $request); $subscriptParams = [ "Request" => $request //Add more parameters here as needed ]; $result = shell_exec("php work.php ".base64_encode(serialize($subscriptParams))); // Send reply back to client $responder->send("Basic Reply"); } 

work.php

 if (!isset($argv[0])) { die("Need an argument"); } $params = unserialize(base64_decode($argv[0])); //Validate parameters $request = $params["Request"]; // Do some 'work' //Process push notifications here! sleep (1); 

Вот мои предположения:

  1. Часть установки сценария, например установка $context и $responder будет оставаться постоянной навсегда (или вы могли бы жить со временем простоя, необходимым для перезапуска скрипта из-за изменений в этом).

  2. Начало и конец цикла остаются постоянными, моя идея в том, что shell_exec вернет ответ, который может быть использован ответчиком в качестве фактического ответа.

Еще несколько разъяснений:

Я использую serialize чтобы передать массив аргументов, которые нужно использовать work.php . Я делаю декодирование с кодировкой base64 потому что хочу, чтобы весь аргумент вписывался в $argv[0] а не делился на потенциальные пространства, найденные в аргументах.

Для результата work.php можно использовать ту же serialize -> base64_encode и base64_decode -> deserialize work.php .

Обратите внимание, что я лично не пробовал этот код, поэтому я не могу гарантировать, что он работает. Я просто не вижу причин, по которым он не должен работать (убедитесь, что php находится в вашем пути или вызовите /usr/bin/php в shell_exec если это не так).

Стоит отметить, что это решение будет довольно медленным, чем наличие всего кода в одном файле, но это стоимость обновления скрипта на каждой итерации.

Q2: И каков правильный способ сделать это?

Правильный способ сделать что-либо на мой взгляд – выбрать лучший вариант, доступный для вас в начале, чтобы не вкладывать время в метод, который даст вам результаты второго сорта. Я ничего не имею против ZeroMQ хотя я следую логике, что программисты должны всегда стремиться делать чистый код и использовать наилучшие инструменты. В случае создания очереди сообщений с PHP вам будет намного больше успеха с Pheanstalk : https://github.com/pda/pheanstalk

Это рекомендуемый вариант с открытым исходным кодом онлайн и отлично работает на Linux. Установка очереди очень проста, и я написал полный ответ о том, как установить pheanstalk в следующем разделе: Невозможно заставить Beanstalkd Queue работать для PHP

Pheanstalk использует библиотеку beanstalkd, которая очень легка и эффективна. Чтобы преподать очередь сообщений по вашему вопросу, вы можете сделать это с помощью двух простых скриптов php:

Производитель сообщений:

 <?php $pheanstalk = new Pheanstalk('127.0.0.1:11300'); $pheanstalk ->useTube("my_queue") ->put("Hello World"); ?> 

Рабочий сценарий:

 <?php if ($job = $pheanstalk ->watch('testtube') ->ignore('default') ->reserve())//retreives the job if there is one in the queue { echo $job->getData();//echos the message $pheanstalk->delete($job);//deletes the job from the queue } } ?> по <?php if ($job = $pheanstalk ->watch('testtube') ->ignore('default') ->reserve())//retreives the job if there is one in the queue { echo $job->getData();//echos the message $pheanstalk->delete($job);//deletes the job from the queue } } ?> 

Проигрыватель сообщений будет включен в страницу, где пользователь создаст сообщение, и это будет отправлено в очередь, сгенерированную beanstalkd. Рабочий сценарий может быть разработан по-разному. Вы можете поместить его в цикл while для поиска новой очереди каждую секунду, и вы даже можете иметь несколько рабочих, которые ищут очередь. Pheanstalk очень эффективен и настоятельно рекомендуется.