У меня есть функция, которая должна пересекать около 20 тыс. Строк из массива и применять для каждого внешнего скрипта. Это медленный процесс, так как PHP ждет выполнения сценария, прежде чем продолжить следующую строку.
Чтобы ускорить этот процесс, я думал о запуске функции в разных частях одновременно. Так, например, строки от 0 до 2000 как одна функция, от 2001 до 4000 на другой и т. Д. Как я могу сделать это аккуратно? Я мог бы создавать разные задания cron, по одному для каждой функции с разными параметрами: myFunction(0, 2000)
, затем другое задание cron с myFunction(2001, 4000)
и т. Д., Но это не кажется слишком чистым. Каков хороший способ сделать это?
Если вы хотите выполнять параллельные задачи в PHP, я бы подумал об использовании Gearman . Другим подходом было бы использование pcntl_fork () , но я предпочел бы фактических работников, когда он будет основан на задаче.
Единственное время, в которое вы страдаете, – это получение данных и обработка данных. Обработка данных на самом деле полностью блокируется (вам просто нужно подождать). Вероятно, вы не получите каких-либо преимуществ от увеличения количества процессов до количества ядер, которые у вас есть. В основном, я думаю, это означает, что количество процессов невелико, поэтому планирование выполнения 2-8 процессов не кажется таким отвратительным. Если вас беспокоит невозможность обработки данных во время извлечения данных, вы можете теоретически получить свои данные из базы данных в небольших блоках, а затем распределить нагрузку на обработку между несколькими процессами, по одному для каждого ядра.
Я думаю, что я больше привязываюсь к подходу дочерних процессов forking для фактического запуска потоков обработки. В комментариях на странице документа pcntl_fork есть яркая демонстрация, демонстрирующая реализацию класса демона задания
http://php.net/manual/en/function.pcntl-fork.php
<?php declare(ticks=1); //A very basic job daemon that you can extend to your needs. class JobDaemon{ public $maxProcesses = 25; protected $jobsStarted = 0; protected $currentJobs = array(); protected $signalQueue=array(); protected $parentPID; public function __construct(){ echo "constructed \n"; $this->parentPID = getmypid(); pcntl_signal(SIGCHLD, array($this, "childSignalHandler")); } /** * Run the Daemon */ public function run(){ echo "Running \n"; for($i=0; $i<10000; $i++){ $jobID = rand(0,10000000000000); while(count($this->currentJobs) >= $this->maxProcesses){ echo "Maximum children allowed, waiting...\n"; sleep(1); } $launched = $this->launchJob($jobID); } //Wait for child processes to finish before exiting here while(count($this->currentJobs)){ echo "Waiting for current jobs to finish... \n"; sleep(1); } } /** * Launch a job from the job queue */ protected function launchJob($jobID){ $pid = pcntl_fork(); if($pid == -1){ //Problem launching the job error_log('Could not launch new job, exiting'); return false; } else if ($pid){ // Parent process // Sometimes you can receive a signal to the childSignalHandler function before this code executes if // the child script executes quickly enough! // $this->currentJobs[$pid] = $jobID; // In the event that a signal for this pid was caught before we get here, it will be in our signalQueue array // So let's go ahead and process it now as if we'd just received the signal if(isset($this->signalQueue[$pid])){ echo "found $pid in the signal queue, processing it now \n"; $this->childSignalHandler(SIGCHLD, $pid, $this->signalQueue[$pid]); unset($this->signalQueue[$pid]); } } else{ //Forked child, do your deeds.... $exitStatus = 0; //Error code if you need to or whatever echo "Doing something fun in pid ".getmypid()."\n"; exit($exitStatus); } return true; } public function childSignalHandler($signo, $pid=null, $status=null){ //If no pid is provided, that means we're getting the signal from the system. Let's figure out //which child process ended if(!$pid){ $pid = pcntl_waitpid(-1, $status, WNOHANG); } //Make sure we get all of the exited children while($pid > 0){ if($pid && isset($this->currentJobs[$pid])){ $exitCode = pcntl_wexitstatus($status); if($exitCode != 0){ echo "$pid exited with status ".$exitCode."\n"; } unset($this->currentJobs[$pid]); } else if($pid){ //Oh no, our job has finished before this parent process could even note that it had been launched! //Let's make note of it and handle it when the parent process is ready for it echo "..... Adding $pid to the signal queue ..... \n"; $this->signalQueue[$pid] = $status; } $pid = pcntl_waitpid(-1, $status, WNOHANG); } return true; } }
в<?php declare(ticks=1); //A very basic job daemon that you can extend to your needs. class JobDaemon{ public $maxProcesses = 25; protected $jobsStarted = 0; protected $currentJobs = array(); protected $signalQueue=array(); protected $parentPID; public function __construct(){ echo "constructed \n"; $this->parentPID = getmypid(); pcntl_signal(SIGCHLD, array($this, "childSignalHandler")); } /** * Run the Daemon */ public function run(){ echo "Running \n"; for($i=0; $i<10000; $i++){ $jobID = rand(0,10000000000000); while(count($this->currentJobs) >= $this->maxProcesses){ echo "Maximum children allowed, waiting...\n"; sleep(1); } $launched = $this->launchJob($jobID); } //Wait for child processes to finish before exiting here while(count($this->currentJobs)){ echo "Waiting for current jobs to finish... \n"; sleep(1); } } /** * Launch a job from the job queue */ protected function launchJob($jobID){ $pid = pcntl_fork(); if($pid == -1){ //Problem launching the job error_log('Could not launch new job, exiting'); return false; } else if ($pid){ // Parent process // Sometimes you can receive a signal to the childSignalHandler function before this code executes if // the child script executes quickly enough! // $this->currentJobs[$pid] = $jobID; // In the event that a signal for this pid was caught before we get here, it will be in our signalQueue array // So let's go ahead and process it now as if we'd just received the signal if(isset($this->signalQueue[$pid])){ echo "found $pid in the signal queue, processing it now \n"; $this->childSignalHandler(SIGCHLD, $pid, $this->signalQueue[$pid]); unset($this->signalQueue[$pid]); } } else{ //Forked child, do your deeds.... $exitStatus = 0; //Error code if you need to or whatever echo "Doing something fun in pid ".getmypid()."\n"; exit($exitStatus); } return true; } public function childSignalHandler($signo, $pid=null, $status=null){ //If no pid is provided, that means we're getting the signal from the system. Let's figure out //which child process ended if(!$pid){ $pid = pcntl_waitpid(-1, $status, WNOHANG); } //Make sure we get all of the exited children while($pid > 0){ if($pid && isset($this->currentJobs[$pid])){ $exitCode = pcntl_wexitstatus($status); if($exitCode != 0){ echo "$pid exited with status ".$exitCode."\n"; } unset($this->currentJobs[$pid]); } else if($pid){ //Oh no, our job has finished before this parent process could even note that it had been launched! //Let's make note of it and handle it when the parent process is ready for it echo "..... Adding $pid to the signal queue ..... \n"; $this->signalQueue[$pid] = $status; } $pid = pcntl_waitpid(-1, $status, WNOHANG); } return true; } }
с<?php declare(ticks=1); //A very basic job daemon that you can extend to your needs. class JobDaemon{ public $maxProcesses = 25; protected $jobsStarted = 0; protected $currentJobs = array(); protected $signalQueue=array(); protected $parentPID; public function __construct(){ echo "constructed \n"; $this->parentPID = getmypid(); pcntl_signal(SIGCHLD, array($this, "childSignalHandler")); } /** * Run the Daemon */ public function run(){ echo "Running \n"; for($i=0; $i<10000; $i++){ $jobID = rand(0,10000000000000); while(count($this->currentJobs) >= $this->maxProcesses){ echo "Maximum children allowed, waiting...\n"; sleep(1); } $launched = $this->launchJob($jobID); } //Wait for child processes to finish before exiting here while(count($this->currentJobs)){ echo "Waiting for current jobs to finish... \n"; sleep(1); } } /** * Launch a job from the job queue */ protected function launchJob($jobID){ $pid = pcntl_fork(); if($pid == -1){ //Problem launching the job error_log('Could not launch new job, exiting'); return false; } else if ($pid){ // Parent process // Sometimes you can receive a signal to the childSignalHandler function before this code executes if // the child script executes quickly enough! // $this->currentJobs[$pid] = $jobID; // In the event that a signal for this pid was caught before we get here, it will be in our signalQueue array // So let's go ahead and process it now as if we'd just received the signal if(isset($this->signalQueue[$pid])){ echo "found $pid in the signal queue, processing it now \n"; $this->childSignalHandler(SIGCHLD, $pid, $this->signalQueue[$pid]); unset($this->signalQueue[$pid]); } } else{ //Forked child, do your deeds.... $exitStatus = 0; //Error code if you need to or whatever echo "Doing something fun in pid ".getmypid()."\n"; exit($exitStatus); } return true; } public function childSignalHandler($signo, $pid=null, $status=null){ //If no pid is provided, that means we're getting the signal from the system. Let's figure out //which child process ended if(!$pid){ $pid = pcntl_waitpid(-1, $status, WNOHANG); } //Make sure we get all of the exited children while($pid > 0){ if($pid && isset($this->currentJobs[$pid])){ $exitCode = pcntl_wexitstatus($status); if($exitCode != 0){ echo "$pid exited with status ".$exitCode."\n"; } unset($this->currentJobs[$pid]); } else if($pid){ //Oh no, our job has finished before this parent process could even note that it had been launched! //Let's make note of it and handle it when the parent process is ready for it echo "..... Adding $pid to the signal queue ..... \n"; $this->signalQueue[$pid] = $status; } $pid = pcntl_waitpid(-1, $status, WNOHANG); } return true; } }
не<?php declare(ticks=1); //A very basic job daemon that you can extend to your needs. class JobDaemon{ public $maxProcesses = 25; protected $jobsStarted = 0; protected $currentJobs = array(); protected $signalQueue=array(); protected $parentPID; public function __construct(){ echo "constructed \n"; $this->parentPID = getmypid(); pcntl_signal(SIGCHLD, array($this, "childSignalHandler")); } /** * Run the Daemon */ public function run(){ echo "Running \n"; for($i=0; $i<10000; $i++){ $jobID = rand(0,10000000000000); while(count($this->currentJobs) >= $this->maxProcesses){ echo "Maximum children allowed, waiting...\n"; sleep(1); } $launched = $this->launchJob($jobID); } //Wait for child processes to finish before exiting here while(count($this->currentJobs)){ echo "Waiting for current jobs to finish... \n"; sleep(1); } } /** * Launch a job from the job queue */ protected function launchJob($jobID){ $pid = pcntl_fork(); if($pid == -1){ //Problem launching the job error_log('Could not launch new job, exiting'); return false; } else if ($pid){ // Parent process // Sometimes you can receive a signal to the childSignalHandler function before this code executes if // the child script executes quickly enough! // $this->currentJobs[$pid] = $jobID; // In the event that a signal for this pid was caught before we get here, it will be in our signalQueue array // So let's go ahead and process it now as if we'd just received the signal if(isset($this->signalQueue[$pid])){ echo "found $pid in the signal queue, processing it now \n"; $this->childSignalHandler(SIGCHLD, $pid, $this->signalQueue[$pid]); unset($this->signalQueue[$pid]); } } else{ //Forked child, do your deeds.... $exitStatus = 0; //Error code if you need to or whatever echo "Doing something fun in pid ".getmypid()."\n"; exit($exitStatus); } return true; } public function childSignalHandler($signo, $pid=null, $status=null){ //If no pid is provided, that means we're getting the signal from the system. Let's figure out //which child process ended if(!$pid){ $pid = pcntl_waitpid(-1, $status, WNOHANG); } //Make sure we get all of the exited children while($pid > 0){ if($pid && isset($this->currentJobs[$pid])){ $exitCode = pcntl_wexitstatus($status); if($exitCode != 0){ echo "$pid exited with status ".$exitCode."\n"; } unset($this->currentJobs[$pid]); } else if($pid){ //Oh no, our job has finished before this parent process could even note that it had been launched! //Let's make note of it and handle it when the parent process is ready for it echo "..... Adding $pid to the signal queue ..... \n"; $this->signalQueue[$pid] = $status; } $pid = pcntl_waitpid(-1, $status, WNOHANG); } return true; } }
вы можете использовать "PTHREADS"
очень проста в установке и отлично работает на окнах
скачать здесь -> http://windows.php.net/downloads/pecl/releases/pthreads/2.0.4/
Извлеките zip-файл, а затем
переместите файл «php_pthreads.dll» в папку php \ ext \.
переместите файл «pthreadVC2.dll» в папку php \.
затем добавьте эту строку в свой файл «php.ini»:
extension=php_pthreads.dll
сохраните файл.
вы только что сделали 🙂
теперь давайте посмотрим пример того, как его использовать:
class ChildThread extends Thread { public $data; public function run() { /* Do some expensive work */ $this->data = 'result of expensive work'; } } $thread = new ChildThread(); if ($thread->start()) { /* * Do some expensive work, while already doing other * work in the child thread. */ // wait until thread is finished $thread->join(); // we can now even access $thread->data }
для получения дополнительной информации о PTHREADS читайте php docs здесь:
PHP DOCS PTHREADS
если вы используете WAMP, как я, то вы должны добавить «pthreadVC2.dll» в \ wamp \ bin \ apache \ ApacheX.XX \ bin, а также отредактировать файл «php.ini» (тот же путь) и добавить ту же строку как прежде
расширение = php_pthreads.dll
УДАЧИ!
Посмотрите на pcntl_fork . Это позволяет вам создавать дочерние процессы, которые затем могут выполнять отдельную работу, которая вам нужна.
Не уверен, что решение для вашей ситуации, но вы можете перенаправить вывод системных вызовов в файл, поэтому PHP не будет ждать завершения программы. Хотя это может привести к перегрузке вашего сервера.
http://www.php.net/manual/en/function.exec.php. Если с этой функцией запускается программа, чтобы она продолжала работать в фоновом режиме, выход программы должен быть перенаправлен на файл или другой выходной поток. В противном случае PHP зависнет до завершения программы.