diff --git a/src/Threading/Thread.php b/src/Threading/Thread.php index 96156c3..1518015 100644 --- a/src/Threading/Thread.php +++ b/src/Threading/Thread.php @@ -90,7 +90,7 @@ class Thread implements ContextInterface, SynchronizableInterface */ public function isRunning() { - return $this->thread->isRunning() && $this->channel->isOpen(); + return $this->started && $this->thread->isRunning() && $this->channel->isOpen(); } /** @@ -154,6 +154,7 @@ class Thread implements ContextInterface, SynchronizableInterface $response = (yield $this->channel->receive()); if (!$response instanceof ExitStatusInterface) { + $this->kill(); throw new SynchronizationError('Did not receive an exit status from thread.'); } @@ -177,6 +178,7 @@ class Thread implements ContextInterface, SynchronizableInterface $data = (yield $this->channel->receive()); if ($data instanceof ExitStatusInterface) { + $this->kill(); $data = $data->getResult(); throw new SynchronizationError(sprintf( 'Thread unexpectedly exited with result of type: %s', @@ -197,6 +199,7 @@ class Thread implements ContextInterface, SynchronizableInterface } if ($data instanceof ExitStatusInterface) { + $this->kill(); throw new InvalidArgumentError('Cannot send exit status objects.'); } diff --git a/tests/Threading/ThreadTest.php b/tests/Threading/ThreadTest.php index 5203517..314a4ad 100644 --- a/tests/Threading/ThreadTest.php +++ b/tests/Threading/ThreadTest.php @@ -1,6 +1,7 @@ start(); - $this->assertEquals(42, (yield $thread->join())); + $this->assertSame(42, (yield $thread->join())); + })->done(); + + Loop\run(); + } + + public function testSendAndReceive() + { + Coroutine\create(function () { + $thread = new Thread(function () { + yield $this->send(1); + $value = (yield $this->receive()); + yield $value; + }); + + $value = 42; + + $thread->start(); + $this->assertSame(1, (yield $thread->receive())); + yield $thread->send($value); + $this->assertSame($value, (yield $thread->join())); + })->done(); + + Loop\run(); + } + + /** + * @depends testSendAndReceive + * @expectedException \Icicle\Concurrent\Exception\SynchronizationError + */ + public function testJoinWhenThreadSendingData() + { + Coroutine\create(function () { + $thread = new Thread(function () { + yield $this->send(0); + yield 42; + }); + + $thread->start(); + $value = (yield $thread->join()); + })->done(); + + Loop\run(); + } + + /** + * @depends testSendAndReceive + * @expectedException \Icicle\Concurrent\Exception\StatusError + */ + public function testReceiveBeforeThreadHasStarted() + { + Coroutine\create(function () { + $thread = new Thread(function () { + yield $this->send(0); + yield 42; + }); + + $value = (yield $thread->receive()); + })->done(); + + Loop\run(); + } + + /** + * @depends testSendAndReceive + * @expectedException \Icicle\Concurrent\Exception\StatusError + */ + public function testSendBeforeThreadHasStarted() + { + Coroutine\create(function () { + $thread = new Thread(function () { + yield $this->send(0); + yield 42; + }); + + yield $thread->send(0); + })->done(); + + Loop\run(); + } + + /** + * @depends testSendAndReceive + * @expectedException \Icicle\Concurrent\Exception\SynchronizationError + */ + public function testReceiveWhenThreadHasReturned() + { + Coroutine\create(function () { + $thread = new Thread(function () { + yield $this->send(0); + yield 42; + }); + + $thread->start(); + $value = (yield $thread->receive()); + $value = (yield $thread->receive()); + $value = (yield $thread->join()); + })->done(); + + Loop\run(); + } + + /** + * @depends testSendAndReceive + * @expectedException \Icicle\Concurrent\Exception\InvalidArgumentError + */ + public function testSendExitStatus() + { + Coroutine\create(function () { + $thread = new Thread(function () { + $value = (yield $this->receive()); + yield 42; + }); + + $thread->start(); + yield $thread->send(new ExitSuccess(0)); + $value = (yield $thread->join()); })->done(); Loop\run();