Im ищет безопасный и быстрый способ использования общего объекта.
Я задал вопрос уже здесь: https://github.com/krakjoe/pthreads/issues/470, но это было неправильно.
Попытка совместного использования объекта (Threaded) со многими другими контекстами (Thread). Все потоки обновляют этот объект осколков – они могут устанавливать собственные запросы и также отвечать на запросы от других.
Теперь, когда krakjoe ответил, что блокировка / разблокировка не будет доступна в 7, у меня возникла проблема.
Я знаю о: .синхронизированном, но понятия не имею, как использовать его, чтобы заставить его работать для моих нужд.
Как я могу использовать :: synchronized для написания таких методов, как
РЕДАКТИРОВАТЬ:
Я написал (imo) очень простой тестовый скрипт.
этот скрипт не включает методы syc / lock / … atm.
он должен просто показать, что я пытаюсь сделать.
im все еще ищет способ использования :: для обеспечения безопасности общего доступа.
код:
<?php /* TEST: create n threads each will - Shared::set() its own ref - check if Shared::exists() its own ref - Shared::get() its ref back - call method ::isRunning() at returned val to easily check if is ref or got overwritten by another context TODO: using ::synchronized to handle multi-context-access NOTES: every method as public to prevent pthreads v2 "Method Modifiers - Special Behaviour" see: "Method Modifiers - Special Behaviour" at http://blog.krakjoe.ninja/2015/08/a-letter-from-future.html */ class Shared extends Threaded { public $data; public function exists($ident) { return isset($this->data[$ident]); } public function set($ident, $ref) { $return = false; if(!isset($this->data[$ident])){ $data = $this->data; $data[$ident] = $ref; $this->data = $data; $return = $this->data[$ident]; } #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL; return $return; } public function get($ident) { $return = false; if($this->exists($ident) === true){ $data = $this->data; $return = $data[$ident]; unset($data[$ident]); $this->data = $data; } #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL; return $return; } } class T extends Thread { public $count; public function __construct(Shared $Shared, $ident) { $this->Shared = $Shared; $this->ident = $ident; } public function run() { $slowdown = true; $this->count = 0; while(true){ if($slowdown){ // "don't allow usleep or sleep" : https://github.com/krakjoe/pthreads/commit/a157b34057b0f584b4db326f30961b5c760dead8 // loop a bit to simulate work: $start = microtime(true); $until = rand(1, 100000)/1000000; while(microtime(true)-$start < $until){ // ... } } if($this->Shared->exists($this->ident) === true){ $ref = $this->Shared->get($this->ident); } else{ $ref = $this->Shared->set($this->ident, $this); } // calling a method on $ref -- if not a ref we crash $ref->isRunning(); unset($ref); $this->count++; } } } echo 'start ...' . PHP_EOL; $n = 8; $Shared = new Shared(); for($i = 0, $refs = array(); $i < $n; $i++){ $refs[$i] = new T($Shared, $i); $refs[$i]->start(); } while(!empty($refs)){ // print status: if(!isset($t)or microtime(true)-$t > 1){ $t = microtime(true); echo 'status: ' . count($refs) . ' running atm ...' . PHP_EOL; } // join crashed threads: foreach($refs as $i => $thread){ if($thread->isRunning() === false){ echo 'T-' . $i . ' stopped after ' . $thread->count . PHP_EOL; if($thread->isJoined() === false){ $thread->join(); } unset($refs[$i]); } } } echo 'no thread running anymore.' . PHP_EOL; /* output start ... status: 8 running atm ... Notice: Undefined offset: 6 in ...\shared_test.php on line 33 Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82 T-6 stopped after 10 status: 7 running atm ... Notice: Undefined offset: 4 in ...\shared_test.php on line 33 Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82 T-4 stopped after 35 status: 6 running atm ... Notice: Undefined offset: 7 in ...\shared_test.php on line 33 Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82 T-7 stopped after 43 status: 5 running atm ... status: 5 running atm ... status: 5 running atm ... [...] */ ?>
Threaded
объекты уже потокобезопасны, то есть, каждый раз, когда вы читаете, пишите, проверяете наличие или удаляете (отменять) элемент, операция атомарна – ни один другой контекст не может выполнять какие-либо из вышеупомянутых операций, пока первая операция. То же самое верно для обработчиков двигателей, о которых пользователь не знает, все вплоть до самого низкого уровня неявно безопасно.
Совершенно повторное завершение, однако … Это имеет очевидные ограничения, когда логика становится более сложной, например, проверяя существование члена перед установкой или выполнением чего-то еще с ним, как вы это делаете: хотя операции над объектом являются атомарными , нет ничего, чтобы остановить другой контекст, не устанавливающий член между вызовом isset
и вызовом для чтения свойства / измерения.
Это относится к PHP7 (pthreads v3 +)
Безопасность и целостность здесь две разные вещи. Когда целостность важна, вы можете использовать Threaded::synchronized
в PHP7, чтобы сохранить ее правильно. В PHP5 вы также можете сохранить его, но код будет более сложным, как и объяснение.
Второй пример должен выполняться бесконечно, если я понимаю, что это логика. Поэтому я использую это предположение для построения правильного кода, я собираюсь сделать дополнительные предположения о том, что вы можете сделать в этом бесконечном цикле, и дать некоторое представление о том, где, по-видимому, требуется.
<?php class Referee extends Threaded { public function find(string $ident, Threaded $reference) { return $this->synchronized(function () use($ident, $reference) { if (isset($this[$ident])) { return $this[$ident]; } else return ($this[$ident] = $reference); }); } public function foreach(Closure $closure) { $this->synchronized(function() use($closure) { foreach ($this as $ident => $reference) { $closure($ident, $reference); } }); } } class Test extends Thread { public function __construct(Referee $referee, string $ident, bool $delay) { $this->referee = $referee; $this->ident = $ident; $this->delay = $delay; } public function run() { while (1) { if ($this->delay) { $this->synchronized(function(){ $this->wait(1000000); }); } $reference = $this->referee->find($this->ident, $this); /* do something with reference here, I guess */ /* do something with all references here */ $this->referee->foreach(function($ident, $reference){ var_dump(Thread::getCurrentThreadId(), $reference->getIdent(), $reference->isRunning()); }); } } public function getIdent() { return $this->ident; } private $referee; private $ident; private $delay; } $referee = new Referee(); $threads = []; $thread = 0; $idents = [ "smelly", "dopey", "bashful", "grumpy", "sneezy", "sleepy", "happy", "naughty" ]; while ($thread < 8) { $threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1)); $threads[$thread]->start(); $thread++; } foreach ($threads as $thread) $thread->join(); ?>
Поэтому мы рассмотрим различия, я скажу вам, почему они такие, как они есть, и как еще вы могли бы их написать, вы уже знаете, что сейчас мы говорим не о безопасности, а о целостности, вам предоставляется замечательное) предположение, что все, что вы пишете, «безопасно», как объяснено.
Первое существенное различие заключается в следующем:
if ($this->delay) { $this->synchronized(function(){ $this->wait(1000000); }); }
Это просто подходящий способ сделать Thread
, вам не нужно будет использовать сам Thread
для синхронизации, вы можете использовать любой объект Threaded
. Преимущество делать вещи правильно, в случае, если не ясно, что спать и спать не оставляют потоки в восприимчивом состоянии, используя ::wait
.
В реальном мире, где вы действительно должны только ждать чего-то, это будет более сложный блок, он может (и должен) выглядеть больше:
if ($this->delay) { $this->synchronized(function(){ while ($this->condition) { $this->wait(1000000); } }); }
Примечание: ожидание таймаута технически ждет чего-то, однако вас может разбудить что-то другое, кроме достигнутого таймаута, и код должен быть подготовлен к этому.
Таким образом, другой контекст может уведомить Thread
том, что он должен прекратить ожидание и выключение изящно или выполнить какое-либо другое важное действие немедленно, просто путем синхронизации, изменения состояния и уведомления Thread
.
Для предсказуемого кода чрезвычайно важно понять, как синхронизироваться, ждать и уведомлять работу.
Затем у нас есть своя логика для установки и получения ссылки:
$reference = $this->referee->find($this->ident, $this);
Что вызывает это:
public function find(string $ident, Threaded $reference) { return $this->synchronized(function () use($ident, $reference) { if (isset($this[$ident])) { return $this[$ident]; } else return ($this[$ident] = $reference); }); }
Это плохо названо, именовать вещи сложно, но вы можете видеть, что целостность сохраняется синхронизацией, в то время как эти сгруппированные операции имеют место. Тот же метод можно также использовать для получения ссылки на другой объект с небольшим количеством настроек.
Я думаю, вы делаете что-то с этой конкретной ссылкой (которая всегда будет $this
настоящее время). Я не могу догадаться. Двигаемся дальше …
Я сделал предположение, что вы захотите что-то сделать с каждой из этих Threads
, и вы хотите сохранить целостность данных во время всей итерации:
$this->referee->foreach(function($ident, $reference){ var_dump(Thread::getCurrentThreadId(), $reference->getIdent(), $reference->isRunning()); });
Что вызывает:
public function foreach(Closure $closure) { $this->synchronized(function() use($closure) { foreach ($this as $ident => $reference) { $closure($ident, $reference); } }); }
Так вы бы так поступили.
Стоит упомянуть, что синхронизация здесь не обязательно требуется; так же, как ничего плохого произойдет, если вы удалите член из массива, который вы итерируете, ничего плохого не произойдет, если вы отмените или установите или сделаете что-нибудь еще для объекта во время итерации.