1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Fix & improve tests; add thread start error check

This commit is contained in:
coderstephen 2015-08-30 18:25:44 -05:00
parent 25fbe7a9a8
commit 1fe1374c8a
6 changed files with 134 additions and 39 deletions

View File

@ -96,7 +96,8 @@ class Thread implements ContextInterface, SynchronizableInterface
/** /**
* Spawns the thread and begins the thread's execution. * Spawns the thread and begins the thread's execution.
* *
* @throws StatusError If the thread has already been started. * @throws StatusError If the thread has already been started.
* @throws ThreadException If starting the thread was unsuccessful.
*/ */
public function start() public function start()
{ {
@ -104,7 +105,9 @@ class Thread implements ContextInterface, SynchronizableInterface
throw new StatusError('The thread has already been started.'); throw new StatusError('The thread has already been started.');
} }
$this->thread->start(PTHREADS_INHERIT_INI | PTHREADS_INHERIT_FUNCTIONS | PTHREADS_INHERIT_CLASSES); if (!$this->thread->start(PTHREADS_INHERIT_INI | PTHREADS_INHERIT_FUNCTIONS | PTHREADS_INHERIT_CLASSES)) {
throw new ThreadException('Failed to start the thread.');
}
$this->started = true; $this->started = true;
} }
@ -140,7 +143,7 @@ class Thread implements ContextInterface, SynchronizableInterface
public function join() public function join()
{ {
if (!$this->started) { if (!$this->started) {
throw new StatusError('The context has not been started.'); throw new StatusError('The thread has not been started.');
} }
try { try {
@ -164,7 +167,7 @@ class Thread implements ContextInterface, SynchronizableInterface
public function receive() public function receive()
{ {
if (!$this->started) { if (!$this->started) {
throw new StatusError('The context has not been started.'); throw new StatusError('The thread has not been started.');
} }
$data = (yield $this->channel->receive()); $data = (yield $this->channel->receive());
@ -186,7 +189,7 @@ class Thread implements ContextInterface, SynchronizableInterface
public function send($data) public function send($data)
{ {
if (!$this->started) { if (!$this->started) {
throw new StatusError('The context has not been started.'); throw new StatusError('The thread has not been started.');
} }
if ($data instanceof ExitStatusInterface) { if ($data instanceof ExitStatusInterface) {
@ -202,7 +205,7 @@ class Thread implements ContextInterface, SynchronizableInterface
public function synchronized(callable $callback) public function synchronized(callable $callback)
{ {
if (!$this->started) { if (!$this->started) {
throw new StatusError('The context has not been started.'); throw new StatusError('The thread has not been started.');
} }
while (!$this->thread->tsl()) { while (!$this->thread->tsl()) {

View File

@ -1,15 +0,0 @@
<?php
namespace Icicle\Tests\Concurrent\Sync;
use Icicle\Concurrent\Sync\ThreadedParcel;
/**
* @requires extension pthreads
*/
class ThreadedParcelTest extends AbstractParcelTest
{
protected function createParcel($value)
{
return new ThreadedParcel($value);
}
}

View File

@ -1,7 +1,7 @@
<?php <?php
namespace Icicle\Tests\Concurrent\Sync; namespace Icicle\Tests\Concurrent\Threading;
use Icicle\Concurrent\Sync\ThreadedMutex; use Icicle\Concurrent\Threading\Mutex;
use Icicle\Coroutine; use Icicle\Coroutine;
use Icicle\Loop; use Icicle\Loop;
use Icicle\Tests\Concurrent\TestCase; use Icicle\Tests\Concurrent\TestCase;
@ -10,12 +10,12 @@ use Icicle\Tests\Concurrent\TestCase;
* @group threading * @group threading
* @requires extension pthreads * @requires extension pthreads
*/ */
class ThreadedMutexTest extends TestCase class MutexTest extends TestCase
{ {
public function testAcquire() public function testAcquire()
{ {
Coroutine\create(function () { Coroutine\create(function () {
$mutex = new ThreadedMutex(); $mutex = new Mutex();
$lock = (yield $mutex->acquire()); $lock = (yield $mutex->acquire());
$lock->release(); $lock->release();
$this->assertTrue($lock->isReleased()); $this->assertTrue($lock->isReleased());
@ -30,7 +30,7 @@ class ThreadedMutexTest extends TestCase
$this->assertRunTimeBetween(function () { $this->assertRunTimeBetween(function () {
Coroutine\create(function () { Coroutine\create(function () {
$mutex = new ThreadedMutex(); $mutex = new Mutex();
$lock1 = (yield $mutex->acquire()); $lock1 = (yield $mutex->acquire());
Loop\timer(0.5, function () use ($lock1) { Loop\timer(0.5, function () use ($lock1) {

View File

@ -0,0 +1,16 @@
<?php
namespace Icicle\Tests\Concurrent\Threading;
use Icicle\Concurrent\Threading\Parcel;
use Icicle\Tests\Concurrent\Sync\AbstractParcelTest;
/**
* @requires extension pthreads
*/
class ParcelTest extends AbstractParcelTest
{
protected function createParcel($value)
{
return new Parcel($value);
}
}

View File

@ -1,7 +1,7 @@
<?php <?php
namespace Icicle\Tests\Concurrent\Sync; namespace Icicle\Tests\Concurrent\Threading;
use Icicle\Concurrent\Sync\ThreadedSemaphore; use Icicle\Concurrent\Threading\Semaphore;
use Icicle\Coroutine; use Icicle\Coroutine;
use Icicle\Loop; use Icicle\Loop;
use Icicle\Tests\Concurrent\TestCase; use Icicle\Tests\Concurrent\TestCase;
@ -10,18 +10,18 @@ use Icicle\Tests\Concurrent\TestCase;
* @group threading * @group threading
* @requires extension pthreads * @requires extension pthreads
*/ */
class ThreadedSemaphoreTest extends TestCase class SemaphoreTest extends TestCase
{ {
public function testCount() public function testCount()
{ {
$semaphore = new ThreadedSemaphore(1); $semaphore = new Semaphore(1);
$this->assertEquals(1, $semaphore->count()); $this->assertEquals(1, $semaphore->count());
} }
public function testAcquire() public function testAcquire()
{ {
Coroutine\create(function () { Coroutine\create(function () {
$semaphore = new ThreadedSemaphore(1); $semaphore = new Semaphore(1);
$lock = (yield $semaphore->acquire()); $lock = (yield $semaphore->acquire());
$lock->release(); $lock->release();
$this->assertTrue($lock->isReleased()); $this->assertTrue($lock->isReleased());
@ -36,7 +36,7 @@ class ThreadedSemaphoreTest extends TestCase
$this->assertRunTimeBetween(function () { $this->assertRunTimeBetween(function () {
Coroutine\create(function () { Coroutine\create(function () {
$semaphore = new ThreadedSemaphore(1); $semaphore = new Semaphore(1);
$lock1 = (yield $semaphore->acquire()); $lock1 = (yield $semaphore->acquire());
Loop\timer(0.5, function () use ($lock1) { Loop\timer(0.5, function () use ($lock1) {

View File

@ -15,12 +15,19 @@ class ThreadTest extends TestCase
public function testIsRunning() public function testIsRunning()
{ {
Coroutine\create(function () { Coroutine\create(function () {
$thread = Thread::spawn(function () { $thread = new Thread(function () {
sleep(1); usleep(100);
}); });
$this->assertFalse($thread->isRunning());
$thread->start();
$this->assertTrue($thread->isRunning()); $this->assertTrue($thread->isRunning());
yield $thread->join(); yield $thread->join();
$this->assertFalse($thread->isRunning());
})->done(); })->done();
Loop\run(); Loop\run();
@ -28,26 +35,79 @@ class ThreadTest extends TestCase
public function testKill() public function testKill()
{ {
$thread = Thread::spawn(function () { $thread = new Thread(function () {
sleep(1); usleep(100);
}); });
$thread->start();
$thread->kill(); $thread->kill();
$this->assertFalse($thread->isRunning()); $this->assertFalse($thread->isRunning());
Loop\run(); Loop\run();
} }
/**
* @expectedException \Icicle\Concurrent\Exception\StatusError
*/
public function testStartWhileRunningThrowsError()
{
$thread = new Thread(function () {
usleep(100);
});
$thread->start();
$thread->start();
}
/**
* @expectedException \Icicle\Concurrent\Exception\StatusError
*/
public function testStartMultipleTimesThrowsError()
{
Loop\loop();
$this->assertRunTimeBetween(function () {
Coroutine\create(function () {
$thread = new Thread(function () {
sleep(1);
});
$thread->start();
yield $thread->join();
$thread->start();
yield $thread->join();
})->done();
Loop\run();
}, 2, 2.2);
}
public function testSpawnStartsThread()
{
Coroutine\create(function () {
$thread = Thread::spawn(function () {
usleep(100);
});
yield $thread->join();
})->done();
Loop\run();
}
/** /**
* @expectedException \Icicle\Concurrent\Exception\PanicError * @expectedException \Icicle\Concurrent\Exception\PanicError
*/ */
public function testExceptionInThreadPanics() public function testExceptionInThreadPanics()
{ {
Coroutine\create(function () { Coroutine\create(function () {
$thread = Thread::spawn(function () { $thread = new Thread(function () {
throw new \Exception('Exception in thread.'); throw new \Exception('Exception in thread.');
}); });
$thread->start();
yield $thread->join(); yield $thread->join();
})->done(); })->done();
@ -56,18 +116,49 @@ class ThreadTest extends TestCase
public function testJoinWaitsForChild() public function testJoinWaitsForChild()
{ {
Loop\loop(Loop\create()); Loop\loop();
$this->assertRunTimeBetween(function () { $this->assertRunTimeBetween(function () {
Coroutine\create(function () { Coroutine\create(function () {
$thread = Thread::spawn(function () { $thread = new Thread(function () {
sleep(1); sleep(1);
}); });
$thread->start();
yield $thread->join(); yield $thread->join();
})->done(); })->done();
Loop\run(); Loop\run();
}, 1, 1.1); }, 1, 1.1);
} }
/**
* @expectedException \Icicle\Concurrent\Exception\StatusError
*/
public function testJoinWithoutStartThrowsError()
{
Coroutine\create(function () {
$thread = new Thread(function () {
usleep(100);
});
yield $thread->join();
})->done();
Loop\run();
}
public function testJoinResolvesWithThreadReturn()
{
Coroutine\create(function () {
$thread = new Thread(function () {
return 42;
});
$thread->start();
$this->assertEquals(42, (yield $thread->join()));
})->done();
Loop\run();
}
} }