Проблема: я хочу реализовать несколько php-рабочих процессов, которые прослушивают очередь MQ-сервера для асинхронных заданий. Проблема в том, что просто запуск этих процессов в качестве демонов на сервере не дает мне никакого контроля над экземплярами (Load, Status, locked) … кроме, возможно, для сброса ps-aux. Из-за этого я ищу среду выполнения, которая позволяет мне контролировать и контролировать экземпляры, либо на уровне системы (процесса), либо на более высоком уровне (какой-то Java-приложений)
Любые указатели?
Вот какой код может быть полезен.
<? define('WANT_PROCESSORS', 5); define('PROCESSOR_EXECUTABLE', '/path/to/your/processor'); set_time_limit(0); $cycles = 0; $run = true; $reload = false; declare(ticks = 30); function signal_handler($signal) { switch($signal) { case SIGTERM : global $run; $run = false; break; case SIGHUP : global $reload; $reload = true; break; } } pcntl_signal(SIGTERM, 'signal_handler'); pcntl_signal(SIGHUP, 'signal_handler'); function spawn_processor() { $pid = pcntl_fork(); if($pid) { global $processors; $processors[] = $pid; } else { if(posix_setsid() == -1) die("Forked process could not detach from terminal\n"); fclose(stdin); fclose(stdout); fclose(stderr); pcntl_exec(PROCESSOR_EXECUTABLE); die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n"); } } function spawn_processors() { global $processors; if($processors) kill_processors(); $processors = array(); for($ix = 0; $ix < WANT_PROCESSORS; $ix++) spawn_processor(); } function kill_processors() { global $processors; foreach($processors as $processor) posix_kill($processor, SIGTERM); foreach($processors as $processor) pcntl_waitpid($processor); unset($processors); } function check_processors() { global $processors; $valid = array(); foreach($processors as $processor) { pcntl_waitpid($processor, $status, WNOHANG); if(posix_getsid($processor)) $valid[] = $processor; } $processors = $valid; if(count($processors) > WANT_PROCESSORS) { for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) posix_kill($processors[$ix], SIGTERM); for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) pcntl_waitpid($processors[$ix]); } elseif(count($processors) < WANT_PROCESSORS) { for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++) spawn_processor(); } } spawn_processors(); while($run) { $cycles++; if($reload) { $reload = false; kill_processors(); spawn_processors(); } else { check_processors(); } usleep(150000); } kill_processors(); pcntl_wait(); ?>
Похоже, у вас уже есть MQ и работает в системе * nix, и вы просто хотите управлять людьми.
Очень простой способ сделать это – использовать экран GNU. Чтобы начать 10 работников, вы можете использовать:
#!/bin/sh for x in `seq 1 10` ; do screen -dmS worker_$x php /path/to/script.php worker$x end
Это запустит 10 работников в фоновом режиме, используя экраны с именем worker_1,2,3 и так далее.
Вы можете подключиться к экранам, запустив screen -r worker_ и перечислив работающих сотрудников, используя экран-список.
Для получения дополнительной информации это руководство может помочь: http://www.kuro5hin.org/story/2004/3/9/16838/14935
Также попробуйте:
Для производственных серверов я обычно рекомендую использовать обычные сценарии запуска системы, но я без проблем запускал команды экрана из сценариев запуска.
Вам действительно нужно, чтобы он постоянно работал?
Если вы хотите только создать новый процесс по запросу, вы можете зарегистрировать его как службу в xinetd.
демон сервера типа pcntl для PHP
Bellow – наша рабочая реализация ответа @chaos. Код для обработки сигналов был удален, так как этот скрипт живет обычно всего несколько миллисекунд.
Кроме того, в коде мы добавили 2 функции для сохранения сообщений между вызовами: restore_processors_state () и save_processors_state (). Мы использовали redis
там, но вы можете решить использовать реализацию в файлах.
Мы запускаем этот скрипт каждую минуту, используя cron. Cron проверяет, все ли процессы активны. Если нет – он снова запускает их и затем умирает. Если мы хотим убить существующие процессы, мы просто запускаем этот скрипт с аргументом kill
: php script.php kill
.
Очень удобный способ запускать рабочих без ввода скриптов в init.d.
<?php include_once dirname( __FILE__ ) . '/path/to/bootstrap.php'; define('WANT_PROCESSORS', 5); define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php'); set_time_limit(0); $run = true; $reload = false; declare(ticks = 30); function restore_processors_state() { global $processors; $redis = Zend_Registry::get('redis'); $pids = $redis->hget('worker_procs', 'pids'); if( !$pids ) { $processors = array(); } else { $processors = json_decode($pids, true); } } function save_processors_state() { global $processors; $redis = Zend_Registry::get('redis'); $redis->hset('worker_procs', 'pids', json_encode($processors)); } function spawn_processor() { $pid = pcntl_fork(); if($pid) { global $processors; $processors[] = $pid; } else { if(posix_setsid() == -1) die("Forked process could not detach from terminal\n"); fclose(STDIN); fclose(STDOUT); fclose(STDERR); pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE)); die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n"); } } function spawn_processors() { restore_processors_state(); check_processors(); save_processors_state(); } function kill_processors() { global $processors; foreach($processors as $processor) posix_kill($processor, SIGTERM); foreach($processors as $processor) pcntl_waitpid($processor, $trash); unset($processors); } function check_processors() { global $processors; $valid = array(); foreach($processors as $processor) { pcntl_waitpid($processor, $status, WNOHANG); if(posix_getsid($processor)) $valid[] = $processor; } $processors = $valid; if(count($processors) > WANT_PROCESSORS) { for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) posix_kill($processors[$ix], SIGTERM); for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) pcntl_waitpid($processors[$ix], $trash); } elseif(count($processors) < WANT_PROCESSORS) { for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++) spawn_processor(); } } if( isset($argv) && count($argv) > 1 ) { if( $argv[1] == 'kill' ) { restore_processors_state(); kill_processors(); save_processors_state(); exit(0); } } spawn_processors();
этот<?php include_once dirname( __FILE__ ) . '/path/to/bootstrap.php'; define('WANT_PROCESSORS', 5); define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php'); set_time_limit(0); $run = true; $reload = false; declare(ticks = 30); function restore_processors_state() { global $processors; $redis = Zend_Registry::get('redis'); $pids = $redis->hget('worker_procs', 'pids'); if( !$pids ) { $processors = array(); } else { $processors = json_decode($pids, true); } } function save_processors_state() { global $processors; $redis = Zend_Registry::get('redis'); $redis->hset('worker_procs', 'pids', json_encode($processors)); } function spawn_processor() { $pid = pcntl_fork(); if($pid) { global $processors; $processors[] = $pid; } else { if(posix_setsid() == -1) die("Forked process could not detach from terminal\n"); fclose(STDIN); fclose(STDOUT); fclose(STDERR); pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE)); die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n"); } } function spawn_processors() { restore_processors_state(); check_processors(); save_processors_state(); } function kill_processors() { global $processors; foreach($processors as $processor) posix_kill($processor, SIGTERM); foreach($processors as $processor) pcntl_waitpid($processor, $trash); unset($processors); } function check_processors() { global $processors; $valid = array(); foreach($processors as $processor) { pcntl_waitpid($processor, $status, WNOHANG); if(posix_getsid($processor)) $valid[] = $processor; } $processors = $valid; if(count($processors) > WANT_PROCESSORS) { for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) posix_kill($processors[$ix], SIGTERM); for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) pcntl_waitpid($processors[$ix], $trash); } elseif(count($processors) < WANT_PROCESSORS) { for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++) spawn_processor(); } } if( isset($argv) && count($argv) > 1 ) { if( $argv[1] == 'kill' ) { restore_processors_state(); kill_processors(); save_processors_state(); exit(0); } } spawn_processors();
не<?php include_once dirname( __FILE__ ) . '/path/to/bootstrap.php'; define('WANT_PROCESSORS', 5); define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php'); set_time_limit(0); $run = true; $reload = false; declare(ticks = 30); function restore_processors_state() { global $processors; $redis = Zend_Registry::get('redis'); $pids = $redis->hget('worker_procs', 'pids'); if( !$pids ) { $processors = array(); } else { $processors = json_decode($pids, true); } } function save_processors_state() { global $processors; $redis = Zend_Registry::get('redis'); $redis->hset('worker_procs', 'pids', json_encode($processors)); } function spawn_processor() { $pid = pcntl_fork(); if($pid) { global $processors; $processors[] = $pid; } else { if(posix_setsid() == -1) die("Forked process could not detach from terminal\n"); fclose(STDIN); fclose(STDOUT); fclose(STDERR); pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE)); die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n"); } } function spawn_processors() { restore_processors_state(); check_processors(); save_processors_state(); } function kill_processors() { global $processors; foreach($processors as $processor) posix_kill($processor, SIGTERM); foreach($processors as $processor) pcntl_waitpid($processor, $trash); unset($processors); } function check_processors() { global $processors; $valid = array(); foreach($processors as $processor) { pcntl_waitpid($processor, $status, WNOHANG); if(posix_getsid($processor)) $valid[] = $processor; } $processors = $valid; if(count($processors) > WANT_PROCESSORS) { for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) posix_kill($processors[$ix], SIGTERM); for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) pcntl_waitpid($processors[$ix], $trash); } elseif(count($processors) < WANT_PROCESSORS) { for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++) spawn_processor(); } } if( isset($argv) && count($argv) > 1 ) { if( $argv[1] == 'kill' ) { restore_processors_state(); kill_processors(); save_processors_state(); exit(0); } } spawn_processors();