Примечание . Это не то же самое, что и этот вопрос, который использует
MessageComponentInterface
. Вместо этого я используюWampServerInterface
, поэтому этот вопрос относится именно к этой части. Мне нужен ответ с примерами кода и объяснением, так как я вижу, что это полезно для других в будущем.
Я использую часть WAMP для Ratchet и ZeroMQ, и в настоящее время у меня есть рабочая версия учебника по интеграции push .
Я пытаюсь выполнить следующее:
У меня есть пункты (1) и (2), однако проблема у меня есть с третьей:
Во-первых: как я могу отправлять данные только каждому конкретному пользователю? Широковещательная передача отправляет его всем, если, возможно, «темы» в конечном итоге не являются индивидуальными идентификаторами пользователей?
Во-вторых: у меня большая проблема с безопасностью. Если я отправляю идентификатор пользователя, который хочет подписаться с клиентской стороны, что мне кажется, мне нужно, тогда пользователь может просто изменить переменную на идентификатор другого пользователя, и вместо этого будут возвращены их данные.
В-третьих: мне нужно запустить отдельный php-скрипт, содержащий код для zeromq, чтобы начать фактический цикл. Я не уверен, что это лучший способ сделать это, и я предпочел бы, чтобы эта работа полностью работала в кодовой базе, а не в отдельном php-файле. Это основная область, которую мне нужно отсортировать.
Следующий код показывает, что у меня есть.
Я буквально php bin/push-server.php
для запуска этого. Подписки и не-подписки выводятся на этот терминал для целей отладки.
$loop = React\EventLoop\Factory::create(); $pusher = Pusher; $context = new React\ZMQ\Context($loop); $pull = $context->getSocket(ZMQ::SOCKET_PULL); $pull->bind('tcp://127.0.0.1:5555'); $pull->on('message', array($pusher, 'onMessage')); $webSock = new React\Socket\Server($loop); $webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect $webServer = new Ratchet\Server\IoServer( new Ratchet\WebSocket\WsServer( new Ratchet\Wamp\WampServer( $pusher ) ), $webSock ); $loop->run();
-$loop = React\EventLoop\Factory::create(); $pusher = Pusher; $context = new React\ZMQ\Context($loop); $pull = $context->getSocket(ZMQ::SOCKET_PULL); $pull->bind('tcp://127.0.0.1:5555'); $pull->on('message', array($pusher, 'onMessage')); $webSock = new React\Socket\Server($loop); $webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect $webServer = new Ratchet\Server\IoServer( new Ratchet\WebSocket\WsServer( new Ratchet\Wamp\WampServer( $pusher ) ), $webSock ); $loop->run();
Я опустил бесполезный материал и сосредоточился на onMessage()
и onSubscribe()
.
public function onSubscribe(ConnectionInterface $conn, $topic) { $subject = $topic->getId(); $ip = $conn->remoteAddress; if (!array_key_exists($subject, $this->subscribedTopics)) { $this->subscribedTopics[$subject] = $topic; } $this->clients[] = $conn->resourceId; echo sprintf("New Connection: %s" . PHP_EOL, $conn->remoteAddress); } public function onMessage($entry) { $entryData = json_decode($entry, true); var_dump($entryData); if (!array_key_exists($entryData['topic'], $this->subscribedTopics)) { return; } $topic = $this->subscribedTopics[$entryData['topic']]; // This sends out everything to multiple users, not what I want!! // I can't send() to individual connections from here I don't think :S $topic->broadcast($entryData); }
Это моя проблема – это отдельный php-файл, который, надеюсь, может быть интегрирован в другой код в будущем, но в настоящее время я не уверен, как правильно его использовать. Я могу получить идентификатор пользователя из сеанса? Мне все равно нужно отправить его с клиентской стороны …
// Thought sessions might work here but they don't work for subscription session_start(); $userId = $_SESSION['userId']; $loop = React\EventLoop\Factory::create(); $context = new ZMQContext(); $socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher'); $socket->connect("tcp://localhost:5555"); $i = 0; $loop->addPeriodicTimer(4, function() use ($socket, $loop, $userId, &$i) { $entryData = array( 'topic' => 'subscriptionTopicHere', 'userId' => $userId ); $i++; // So it doesn't go on infinitely if run from browser if ($i >= 3) { $loop->stop(); } // Send stuff to the queue $socket->send(json_encode($entryData)); });
$(document).ready(function() { var conn = new ab.Session( 'ws://localhost:8080' , function() { conn.subscribe('topicHere', function(topic, data) { console.log(topic); console.log(data); }); } , function() { console.warn('WebSocket connection closed'); } , { 'skipSubprotocolCheck': true } ); });
Вышеуказанное работает, но мне действительно нужно выяснить следующее:
Как я могу отправлять отдельные сообщения отдельным пользователям? Когда они посещают страницу, которая запускает соединение с websocket в JS, должен ли я также запускать скрипт, который загружает вещи в очередь в PHP (zeromq)? Это то, что я сейчас делаю вручную, и это просто неправильно .
При подписке на пользователя из JS не может быть безопасно захватить идентификатор пользователя из сеанса и отправить его с клиентской стороны. Это может быть фальшивка. Скажите, пожалуйста, что есть более простой способ, и если да, то как?
Примечание. Мой ответ здесь не содержит ссылок на ZeroMQ, поскольку я больше его не использую. Тем не менее, я уверен, что вы сможете выяснить, как использовать ZeroMQ с этим ответом, если вам нужно.
Прежде всего, Websocket RFC и WAMP Spec указывают, что тема для подписки должна быть строкой . Я немного обманываю здесь, но я все еще придерживаюсь спецификации: вместо этого я пропускаю JSON.
{ "topic": "subject here", "userId": "1", "token": "dsah9273bui3f92h3r83f82h3" }
JSON по-прежнему является строкой, но он позволяет мне передавать больше данных вместо «темы», и для PHP просто сделать json_decode()
на другом конце. Конечно, вы должны подтвердить, что вы действительно получаете JSON, но это зависит от вашей реализации.
Тема – тема, на которую подписывается пользователь. Вы используете это, чтобы решить, какие данные вы передадите пользователю.
Очевидно, идентификатор пользователя. Вы должны убедиться, что этот пользователь существует и ему разрешено подписываться, используя следующую часть:
Это должен быть один случайный сгенерированный токен, сгенерированный в вашем PHP, и переданный переменной JavaScript. Когда я говорю «одно использование», я имею в виду, что каждый раз, когда вы перезагружаете страницу (и, соответственно, каждый HTTP-запрос), ваша переменная JavaScript должна иметь новый токен. Этот токен должен храниться в базе данных по идентификатору пользователя.
Затем, как только запрос websocket выполняется, вы сопоставляете токены и идентификатор пользователя с данными в базе данных, чтобы убедиться, что пользователь действительно является тем, кем они себя называют, и они не возились с переменными JS.
Примечание. В обработчике событий вы можете использовать
$conn->remoteAddress
чтобы получить IP-адрес соединения, поэтому, если кто-то пытается подключиться злонамеренно, вы можете заблокировать их (зарегистрировать их или что-то еще).
Это работает, потому что каждый раз, когда приходит новое соединение, уникальный токен гарантирует, что ни у кого не будет доступа к данным подписки кого-либо еще.
Вот что я использую для запуска цикла и обработчика событий. Я создаю цикл, делая все создание объекта стиля декоратора и передавая в свой EventHandler (который я скоро приду) с петлей там тоже.
$loop = Factory::create(); new IoServer( new WsServer( new WampServer( new EventHandler($loop) // This is my class. Pass in the loop! ) ), $webSock ); $loop->run();
class EventHandler implements WampServerInterface, MessageComponentInterface { /** * @var \React\EventLoop\LoopInterface */ private $loop; /** * @var array List of connected clients */ private $clients; /** * Pass in the react event loop here */ public function __construct(LoopInterface $loop) { $this->loop = $loop; } /** * A user connects, we store the connection by the unique resource id */ public function onOpen(ConnectionInterface $conn) { $this->clients[$conn->resourceId]['conn'] = $conn; } /** * A user subscribes. The JSON is in $subscription->getId() */ public function onSubscribe(ConnectionInterface $conn, $subscription) { // This is the JSON passed in from your JavaScript // Obviously you need to validate it's JSON and expected data etc... $data = json_decode(subscription->getId()); // Validate the users id and token together against the db values // Now, let's subscribe this user only // 5 = the interval, in seconds $timer = $this->loop->addPeriodicTimer(5, function() use ($subscription) { $data = "whatever data you want to broadcast"; return $subscription->broadcast(json_encode($data)); }); // Store the timer against that user's connection resource Id $this->clients[$conn->resourceId]['timer'] = $timer; } public function onClose(ConnectionInterface $conn) { // There might be a connection without a timer // So make sure there is one before trying to cancel it! if (isset($this->clients[$conn->resourceId]['timer'])) { if ($this->clients[$conn->resourceId]['timer'] instanceof TimerInterface) { $this->loop->cancelTimer($this->clients[$conn->resourceId]['timer']); } } unset($this->clients[$conn->resourceId]); } /** Implement all the extra methods the interfaces say that you must use **/ }
неclass EventHandler implements WampServerInterface, MessageComponentInterface { /** * @var \React\EventLoop\LoopInterface */ private $loop; /** * @var array List of connected clients */ private $clients; /** * Pass in the react event loop here */ public function __construct(LoopInterface $loop) { $this->loop = $loop; } /** * A user connects, we store the connection by the unique resource id */ public function onOpen(ConnectionInterface $conn) { $this->clients[$conn->resourceId]['conn'] = $conn; } /** * A user subscribes. The JSON is in $subscription->getId() */ public function onSubscribe(ConnectionInterface $conn, $subscription) { // This is the JSON passed in from your JavaScript // Obviously you need to validate it's JSON and expected data etc... $data = json_decode(subscription->getId()); // Validate the users id and token together against the db values // Now, let's subscribe this user only // 5 = the interval, in seconds $timer = $this->loop->addPeriodicTimer(5, function() use ($subscription) { $data = "whatever data you want to broadcast"; return $subscription->broadcast(json_encode($data)); }); // Store the timer against that user's connection resource Id $this->clients[$conn->resourceId]['timer'] = $timer; } public function onClose(ConnectionInterface $conn) { // There might be a connection without a timer // So make sure there is one before trying to cancel it! if (isset($this->clients[$conn->resourceId]['timer'])) { if ($this->clients[$conn->resourceId]['timer'] instanceof TimerInterface) { $this->loop->cancelTimer($this->clients[$conn->resourceId]['timer']); } } unset($this->clients[$conn->resourceId]); } /** Implement all the extra methods the interfaces say that you must use **/ }
Это в основном это. Основные моменты здесь:
Вы должны обеспечить, чтобы все данные были проверены, а не попытка взлома, прежде чем что-либо сделать с ним. Записывайте все попытки подключения с помощью чего-то вроде Monolog и настройте отправку электронной почты, если происходит критическое событие (например, сервер перестает работать, потому что кто-то является ублюдком и пытается взломать ваш сервер).
Чтобы отправить конкретным пользователям, вам понадобится шаблон ROUTER-DEALER вместо PUB-SUB. Это объясняется в Руководстве в главе 3. Безопасность, если вы используете ZMQ v4.0, обрабатывается на уровне проводов, поэтому вы не видите ее в приложении. Это по-прежнему требует некоторой работы, если вы не используете привязку CZMQ, которая обеспечивает структуру аутентификации (zauth).
В принципе, для аутентификации вы устанавливаете обработчик inproc: //zeromq.zap.01 и отвечаете на запросы по этому сокету. ZeroMQ ZAP для RFC; есть также тестовый пример в основной программе libzmq / tests / test_security_curve.cpp.