Почему не все потоки завершены?

Я пробовал пример из этого ответа Джо https://stackoverflow.com/a/32187103/2229367, и он отлично работает, но когда я попытался немного изменить этот код:

$pool = new Pool(4); while (@$i++<10) { $pool->submit(new class($i) extends Collectable { public function __construct($id) { $this->id = $id; } public function run() { printf( "Hello World from %d\n", $this->id); $this->html = file_get_contents('http://google.fr?q=' . $this->query); $this->setGarbage(); } public $id; public $html; }); } while ($pool->collect(function(Collectable $work){ printf( "Collecting %d\n", $work->id); var_dump($work->html); return $work->isGarbage(); })) continue; $pool->shutdown(); 

Граф «Hello world» отличается от графа «Сбор». Документы устарели. Как насчет этой проблемы?

Worker::collect не предназначен для получения результатов; Он не детерминирован.

Worker::collect предназначен только для запуска сбора мусора на объектах, на которые ссылаются в стек объектов Worker .

Если целью является обработка каждого результата по мере его появления, код может выглядеть примерно так:

 <?php $pool = new Pool(4); $results = new Volatile(); $expected = 10; $found = 0; while (@$i++ < $expected) { $pool->submit(new class($i, $results) extends Threaded { public function __construct($id, Volatile $results) { $this->id = $id; $this->results = $results; } public function run() { $result = file_get_contents('http://google.fr?q=' . $this->id); $this->results->synchronized(function($results, $result){ $results[$this->id] = $result; $results->notify(); }, $this->results, $result); } private $id; private $results; }); } do { $next = $results->synchronized(function() use(&$found, $results) { while (!count($results)) { $results->wait(); } $found++; return $results->shift(); }); var_dump($next); } while ($found < $expected); while ($pool->collect()) continue; $pool->shutdown(); ?> конечные <?php $pool = new Pool(4); $results = new Volatile(); $expected = 10; $found = 0; while (@$i++ < $expected) { $pool->submit(new class($i, $results) extends Threaded { public function __construct($id, Volatile $results) { $this->id = $id; $this->results = $results; } public function run() { $result = file_get_contents('http://google.fr?q=' . $this->id); $this->results->synchronized(function($results, $result){ $results[$this->id] = $result; $results->notify(); }, $this->results, $result); } private $id; private $results; }); } do { $next = $results->synchronized(function() use(&$found, $results) { while (!count($results)) { $results->wait(); } $found++; return $results->shift(); }); var_dump($next); } while ($found < $expected); while ($pool->collect()) continue; $pool->shutdown(); ?> конкретные <?php $pool = new Pool(4); $results = new Volatile(); $expected = 10; $found = 0; while (@$i++ < $expected) { $pool->submit(new class($i, $results) extends Threaded { public function __construct($id, Volatile $results) { $this->id = $id; $this->results = $results; } public function run() { $result = file_get_contents('http://google.fr?q=' . $this->id); $this->results->synchronized(function($results, $result){ $results[$this->id] = $result; $results->notify(); }, $this->results, $result); } private $id; private $results; }); } do { $next = $results->synchronized(function() use(&$found, $results) { while (!count($results)) { $results->wait(); } $found++; return $results->shift(); }); var_dump($next); } while ($found < $expected); while ($pool->collect()) continue; $pool->shutdown(); ?> 

Это, очевидно, не очень толерантно к ошибкам, но основное отличие заключается в том, что я использую общую коллекцию результатов Volatile , и я правильно синхронизирую, чтобы получить результаты в основном контексте по мере их появления.

Если вы хотите дождаться, когда все результаты станут доступными, и, возможно, избежите некоторых споров о блокировках – которые вы всегда должны избегать, если сможете, тогда код выглядел бы проще, что-то вроде:

 <?php $pool = new Pool(4); $results = new Volatile(); $expected = 10; while (@$i++ < $expected) { $pool->submit(new class($i, $results) extends Threaded { public function __construct($id, Volatile $results) { $this->id = $id; $this->results = $results; } public function run() { $result = file_get_contents('http://google.fr?q=' . $this->id); $this->results->synchronized(function($results, $result){ $results[$this->id] = $result; $results->notify(); }, $this->results, $result); } private $id; private $results; }); } $results->synchronized(function() use($expected, $results) { while (count($results) != $expected) { $results->wait(); } }); var_dump(count($results)); while ($pool->collect()) continue; $pool->shutdown(); ?> конечные <?php $pool = new Pool(4); $results = new Volatile(); $expected = 10; while (@$i++ < $expected) { $pool->submit(new class($i, $results) extends Threaded { public function __construct($id, Volatile $results) { $this->id = $id; $this->results = $results; } public function run() { $result = file_get_contents('http://google.fr?q=' . $this->id); $this->results->synchronized(function($results, $result){ $results[$this->id] = $result; $results->notify(); }, $this->results, $result); } private $id; private $results; }); } $results->synchronized(function() use($expected, $results) { while (count($results) != $expected) { $results->wait(); } }); var_dump(count($results)); while ($pool->collect()) continue; $pool->shutdown(); ?> конкретные <?php $pool = new Pool(4); $results = new Volatile(); $expected = 10; while (@$i++ < $expected) { $pool->submit(new class($i, $results) extends Threaded { public function __construct($id, Volatile $results) { $this->id = $id; $this->results = $results; } public function run() { $result = file_get_contents('http://google.fr?q=' . $this->id); $this->results->synchronized(function($results, $result){ $results[$this->id] = $result; $results->notify(); }, $this->results, $result); } private $id; private $results; }); } $results->synchronized(function() use($expected, $results) { while (count($results) != $expected) { $results->wait(); } }); var_dump(count($results)); while ($pool->collect()) continue; $pool->shutdown(); ?> 

Примечательно, что интерфейс Collectable уже реализован Threaded в самых последних версиях pthreads – именно тот, который вы должны использовать … всегда …

Документы устарели, извините за это … один человек …

Pthreads V3 гораздо менее прощен, чем V2. собирать не стоит в V3.

Правило № 1: Я делаю все свои запросы внутри потоков, избегая пропускать слишком большое количество данных внутри них. Это было нормально с V2, а не с V3. Я продолжаю передавать аргументы рабочим как можно более аккуратные. Это также позволяет ускорить процесс.

Правило № 2: я не перехожу по количеству потоков ЦП, доступных для каждого пула, и помечаю их соответственно циклом. Таким образом, я уверен, что накладные расходы на память отсутствуют с тонны пулов, и каждый раз, когда цикл завершен, я принудительно собираю мусор. Это оказалось необходимым для меня из-за очень высоких потребностей Рама в потоках, возможно, не ваше дело, но убедитесь, что ваш потребляемый баран не перейдет ваш лимит php. Больше вы передали аргументы в потоки большие, больше баран будет быстро расти.

Правило № 3: Правильно объявляйте свои массивы объектов у рабочих (массив), чтобы убедиться, что все результаты возвращены.

Вот базовый перезаписанный рабочий пример, следуя 3 правилам, как можно ближе к вашему примеру:

  • использует массив запросов для многопоточности.

  • собираемый инструмент для захвата результатов вместо сбора.

  • пакеты пулов в соответствии с процессором nb потоков, чтобы избежать накладных расходов на блок.

  • и каждый из них имел свою связь, а не проходил через рабочих.

  • нажав все результаты внутри массива в конце.

код:

  define("SQLHOST", "127.0.0.1"); define("SQLUSER", "root"); define("SQLPASS", "password"); define("SQLDBTA", "mydatabase"); $Nb_of_th=12; // (6 cpu cores in this example) $queries = array_chunk($queries, ($Nb_of_th));// whatever list of queries you want to pass to the workers $global_data=array();// all results from all pool cycles // first we set the main loops foreach ($queries as $key => $chunks) { $pool = new Pool($Nb_of_th, Worker::class);// 12 pools max $workCount = count($chunks); // second we launch the submits foreach (range(1, $workCount) as $i) { $chunck = $chunks[$i - 1]; $pool->submit(new MyWorkers($chunck)); } $data = [];// pool cycle result array $collector = function (\Collectable $work) use (&$data) { $isGarbage = $work->isGarbage(); if ($isGarbage) { $data[] = $work->result; // thread result } return $isGarbage; }; do { $count = $pool->collect($collector); $isComplete = count($data) === $workCount; } while (!$isComplete); array_push($global_data, $data);// push pool results into main //complete purge unset($data); $pool->shutdown(); unset($pool); gc_collect_cycles();// force garbage collector before new pool cycle } Var_dump($global_data); // results for all pool cycles class MyWorkers extends \Threaded implements \Collectable { private $isGarbage; public $result; private $process; public function __construct($process) { $this->process = $process; } public function run() { $con = new PDO('mysql:host=' . SQLHOST . ';dbname=' . SQLDBTA . ';charset=UTF8', SQLUSER, SQLPASS); $proc = (array) $this->process; // important ! avoid volatile destruction in V3 $stmt = $con->prepare($proc); $stmt->execute(); $obj = $stmt1->fetchall(PDO::FETCH_ASSOC); /* do whatever you want to do here */ $this->result = (array) $obj; // important ! avoid volatile destruction in V3 $this->isGarbage = true; } public function isGarbage() : bool { return $this->isGarbage; } } с  define("SQLHOST", "127.0.0.1"); define("SQLUSER", "root"); define("SQLPASS", "password"); define("SQLDBTA", "mydatabase"); $Nb_of_th=12; // (6 cpu cores in this example) $queries = array_chunk($queries, ($Nb_of_th));// whatever list of queries you want to pass to the workers $global_data=array();// all results from all pool cycles // first we set the main loops foreach ($queries as $key => $chunks) { $pool = new Pool($Nb_of_th, Worker::class);// 12 pools max $workCount = count($chunks); // second we launch the submits foreach (range(1, $workCount) as $i) { $chunck = $chunks[$i - 1]; $pool->submit(new MyWorkers($chunck)); } $data = [];// pool cycle result array $collector = function (\Collectable $work) use (&$data) { $isGarbage = $work->isGarbage(); if ($isGarbage) { $data[] = $work->result; // thread result } return $isGarbage; }; do { $count = $pool->collect($collector); $isComplete = count($data) === $workCount; } while (!$isComplete); array_push($global_data, $data);// push pool results into main //complete purge unset($data); $pool->shutdown(); unset($pool); gc_collect_cycles();// force garbage collector before new pool cycle } Var_dump($global_data); // results for all pool cycles class MyWorkers extends \Threaded implements \Collectable { private $isGarbage; public $result; private $process; public function __construct($process) { $this->process = $process; } public function run() { $con = new PDO('mysql:host=' . SQLHOST . ';dbname=' . SQLDBTA . ';charset=UTF8', SQLUSER, SQLPASS); $proc = (array) $this->process; // important ! avoid volatile destruction in V3 $stmt = $con->prepare($proc); $stmt->execute(); $obj = $stmt1->fetchall(PDO::FETCH_ASSOC); /* do whatever you want to do here */ $this->result = (array) $obj; // important ! avoid volatile destruction in V3 $this->isGarbage = true; } public function isGarbage() : bool { return $this->isGarbage; } } с  define("SQLHOST", "127.0.0.1"); define("SQLUSER", "root"); define("SQLPASS", "password"); define("SQLDBTA", "mydatabase"); $Nb_of_th=12; // (6 cpu cores in this example) $queries = array_chunk($queries, ($Nb_of_th));// whatever list of queries you want to pass to the workers $global_data=array();// all results from all pool cycles // first we set the main loops foreach ($queries as $key => $chunks) { $pool = new Pool($Nb_of_th, Worker::class);// 12 pools max $workCount = count($chunks); // second we launch the submits foreach (range(1, $workCount) as $i) { $chunck = $chunks[$i - 1]; $pool->submit(new MyWorkers($chunck)); } $data = [];// pool cycle result array $collector = function (\Collectable $work) use (&$data) { $isGarbage = $work->isGarbage(); if ($isGarbage) { $data[] = $work->result; // thread result } return $isGarbage; }; do { $count = $pool->collect($collector); $isComplete = count($data) === $workCount; } while (!$isComplete); array_push($global_data, $data);// push pool results into main //complete purge unset($data); $pool->shutdown(); unset($pool); gc_collect_cycles();// force garbage collector before new pool cycle } Var_dump($global_data); // results for all pool cycles class MyWorkers extends \Threaded implements \Collectable { private $isGarbage; public $result; private $process; public function __construct($process) { $this->process = $process; } public function run() { $con = new PDO('mysql:host=' . SQLHOST . ';dbname=' . SQLDBTA . ';charset=UTF8', SQLUSER, SQLPASS); $proc = (array) $this->process; // important ! avoid volatile destruction in V3 $stmt = $con->prepare($proc); $stmt->execute(); $obj = $stmt1->fetchall(PDO::FETCH_ASSOC); /* do whatever you want to do here */ $this->result = (array) $obj; // important ! avoid volatile destruction in V3 $this->isGarbage = true; } public function isGarbage() : bool { return $this->isGarbage; } }