diff --git a/lib/Threading/Thread.php b/lib/Threading/Thread.php index d0bd962..c50a529 100644 --- a/lib/Threading/Thread.php +++ b/lib/Threading/Thread.php @@ -143,19 +143,21 @@ class Thread implements Strand { list($channel, $this->socket) = $sockets; - $this->thread = $thread = new Internal\Thread($this->socket, $this->function, $this->args); + $this->thread = new Internal\Thread($this->socket, $this->function, $this->args); if (!$this->thread->start(PTHREADS_INHERIT_INI)) { throw new ContextException('Failed to start the thread.'); } - $this->channel = $channel = new ChannelledSocket($channel, $channel); + $this->channel = new ChannelledSocket($channel, $channel); - $this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () use ($thread, $channel) { - if (!$thread->isRunning()) { - $channel->close(); + $this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, function () { + if (!$this->thread->isRunning()) { + $this->channel->close(); } }); + + Loop::disable($this->watcher); } /** @@ -200,6 +202,7 @@ class Thread implements Strand { * * @throws StatusError Thrown if the context has not been started. * @throws SynchronizationError Thrown if an exit status object is not received. + * @throws ContextException If the context stops responding. */ public function join(): Promise { if ($this->channel == null || $this->thread === null) { @@ -214,9 +217,12 @@ class Thread implements Strand { * * @return \Generator * - * @throws \Amp\Parallel\SynchronizationError If the thread does not send an exit status. + * @throws SynchronizationError If the thread does not send an exit status. + * @throws ContextException If the context stops responding. */ private function doJoin(): \Generator { + Loop::enable($this->watcher); + try { $response = yield $this->channel->receive(); @@ -232,6 +238,7 @@ class Thread implements Strand { $this->kill(); throw $exception; } finally { + Loop::disable($this->watcher); $this->close(); } @@ -250,12 +257,16 @@ class Thread implements Strand { } private function doReceive() { + Loop::enable($this->watcher); + try { $data = yield $this->channel->receive(); } catch (ChannelException $exception) { throw new ContextException( "The context stopped responding, potentially due to a fatal error or calling exit", 0, $exception ); + } finally { + Loop::disable($this->watcher); } if ($data instanceof ExitResult) {