PHP pThreads – Как вы выполняете сборку мусора?

Учитывая следующий код, как вы можете гарантировать, что завершенные объекты MyWorker будут уничтожены / освобождена их память?

Из-за того, что мне нужен мой скрипт, мне нужно ~ 50 потоков, которые постоянно получают данные из cURL и обрабатывают его.

Я попробовал оба варианта, когда нити никогда не покидают run() , или, как показано в этом примере кода, они оставляют запуск и имеют функцию сбора, чтобы создать новую копию.

Но не важно, что я ударил пределы памяти через минуту или около того. Не могли бы вы рассказать мне, что я делаю неправильно?

 class MyWorker extends Threaded { public $complete; public function __construct() {$this->complete = false;} public function run() {$this->complete = true;} } $pool = new Pool(50); for($i=0; $i<50; $i++) $pool->submit(new MyWorker()); $pool->collect(function($worker) { global $pool; if($worker->complete == true) $pool->submit(new MyWorker()); return $worker->complete; }); $pool->shutdown(); 

Зачем

Зачем мне все равно собирать?

В потоках Worker предоставляемых pthreads, требуется, чтобы программист сохранил правильные ссылки на объекты Threaded , которые выполняются. Это трудно реализовать программисту в пользовательской среде, так что pthreads предоставляет абстракцию Pool Workers которая поддерживает ссылки для вас.

Чтобы поддерживать эти ссылочные pthreads, необходимо знать, когда объект является мусором, для этой цели он предоставляет интерфейс Pool::collect . Pool::collect принимает закрытие, которое должно принимать объект Threaded и возвращать логическое значение true если прошедший объект завершен.

Как

Задача под рукой …

Чтобы продолжать отправлять задания для выполнения и не исчерпывать ресурсы, вы должны создать очередь завершенных задач для повторной отправки в Pool

Следующий код демонстрирует разумный способ сделать это:

 <?php define("LOG", Mutex::create()); /* thread safe log to stdout */ function slog($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { Mutex::lock(LOG); echo vsprintf( "{$message}\n", $args); Mutex::unlock(LOG); } } class Request extends Threaded { public function __construct($url) { $this->url = $url; } public function run() { $response = @file_get_contents($this->url); slog("%s returned %d bytes", $this->url, strlen($response)); $this->reQueue(); } public function getURL() { return $this->url; } public function isQueued() { return $this->queued; } public function reQueue() { $this->queued = true; } protected $url; protected $queued = false; } /* create a pool of 50 threads */ $pool = new Pool(50); /* submit 50 requests for execution */ while (@$i++<50) { $pool->submit(new Request(sprintf( "http://google.com/?q=%s", md5($i)))); } do { $queue = array(); $pool->collect(function($request) use ($pool, &$queue) { /* check for items to requeue */ if ($request->isQueued()) { /* get the url for the request, insert into queue */ $queue[] = $request->getURL(); /* allow this job to be collected */ return true; } }); /* resubmit completed tasks to pool */ if (count($queue)) { foreach ($queue as $queued) $pool->submit(new Request($queued)); } /* sleep for a couple of seconds here ... because, be nice ! */ usleep(2.5 * 1000000); } while (true); ?>