1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 12:24:40 +01:00

Upgrade to PHPUnit 7 or 8

This commit is contained in:
Aaron Piotrowski 2019-08-27 12:17:41 -05:00
parent c73c6d0a77
commit 24213ee911
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
27 changed files with 736 additions and 904 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
.php_cs.cache
.phpunit.result.cache
.vagrant
build
composer.lock

View File

@ -21,15 +21,16 @@
}
],
"require": {
"php": ">=7.1",
"amphp/amp": "^2",
"amphp/byte-stream": "^1.5",
"amphp/byte-stream": "^1.6.1",
"amphp/parser": "^1",
"amphp/process": "^1",
"amphp/sync": "^1.0.1"
},
"require-dev": {
"phpunit/phpunit": "^6",
"amphp/phpunit-util": "^1",
"phpunit/phpunit": "^8 || ^7",
"amphp/phpunit-util": "^1.1",
"amphp/php-cs-fixer-config": "dev-master"
},
"autoload": {
@ -46,11 +47,6 @@
"Amp\\Parallel\\Test\\": "test"
}
},
"config": {
"platform": {
"php": "7.0.0"
}
},
"scripts": {
"check": [
"@cs",

View File

@ -27,10 +27,7 @@
</exclude>
</whitelist>
</filter>
<listeners>
<listener class="Amp\PHPUnit\LoopReset"/>
</listeners>
<logging>
<log type="coverage-html" target="build/coverage" title="Amp" highlight="true"/>
<log type="coverage-html" target="build/coverage"/>
</logging>
</phpunit>

View File

@ -3,144 +3,119 @@
namespace Amp\Parallel\Test\Context;
use Amp\Delayed;
use Amp\Loop;
use Amp\Parallel\Context\Context;
use Amp\PHPUnit\TestCase;
use Amp\Parallel\Context\ContextException;
use Amp\Parallel\Sync\PanicError;
use Amp\PHPUnit\AsyncTestCase;
abstract class AbstractContextTest extends TestCase
abstract class AbstractContextTest extends AsyncTestCase
{
abstract public function createContext($script): Context;
public function testBasicProcess()
{
Loop::run(function () {
$context = $this->createContext([
$context = $this->createContext([
__DIR__ . "/Fixtures/test-process.php",
"Test"
]);
yield $context->start();
$this->assertSame("Test", yield $context->join());
});
yield $context->start();
$this->assertSame("Test", yield $context->join());
}
/**
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage No string provided
*/
public function testFailingProcess()
{
Loop::run(function () {
$context = $this->createContext(__DIR__ . "/Fixtures/test-process.php");
yield $context->start();
yield $context->join();
});
$this->expectException(PanicError::class);
$this->expectExceptionMessage('No string provided');
$context = $this->createContext(__DIR__ . "/Fixtures/test-process.php");
yield $context->start();
yield $context->join();
}
/**
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage No script found at '../test-process.php'
*/
public function testInvalidScriptPath()
{
Loop::run(function () {
$context = $this->createContext("../test-process.php");
yield $context->start();
yield $context->join();
});
$this->expectException(PanicError::class);
$this->expectExceptionMessage("No script found at '../test-process.php'");
$context = $this->createContext("../test-process.php");
yield $context->start();
yield $context->join();
}
/**
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage The given data cannot be sent because it is not serializable
*/
public function testInvalidResult()
{
Loop::run(function () {
$context = $this->createContext(__DIR__ . "/Fixtures/invalid-result-process.php");
yield $context->start();
\var_dump(yield $context->join());
});
$this->expectException(PanicError::class);
$this->expectExceptionMessage('The given data cannot be sent because it is not serializable');
$context = $this->createContext(__DIR__ . "/Fixtures/invalid-result-process.php");
yield $context->start();
\var_dump(yield $context->join());
}
/**
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage did not return a callable function
*/
public function testNoCallbackReturned()
{
Loop::run(function () {
$context = $this->createContext(__DIR__ . "/Fixtures/no-callback-process.php");
yield $context->start();
\var_dump(yield $context->join());
});
$this->expectException(PanicError::class);
$this->expectExceptionMessage('did not return a callable function');
$context = $this->createContext(__DIR__ . "/Fixtures/no-callback-process.php");
yield $context->start();
\var_dump(yield $context->join());
}
/**
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage contains a parse error
*/
public function testParseError()
{
Loop::run(function () {
$context = $this->createContext(__DIR__ . "/Fixtures/parse-error-process.inc");
yield $context->start();
\var_dump(yield $context->join());
});
$this->expectException(PanicError::class);
$this->expectExceptionMessage('contains a parse error');
$context = $this->createContext(__DIR__ . "/Fixtures/parse-error-process.inc");
yield $context->start();
yield $context->join();
}
/**
* @expectedException \Amp\Parallel\Context\ContextException
* @expectedExceptionMessage Failed to receive result
*/
public function testKillWhenJoining()
{
Loop::run(function () {
$context = $this->createContext([
$this->expectException(ContextException::class);
$this->expectExceptionMessage('Failed to receive result');
$context = $this->createContext([
__DIR__ . "/Fixtures/delayed-process.php",
5,
]);
yield $context->start();
yield new Delayed(100);
$promise = $context->join();
$context->kill();
$this->assertFalse($context->isRunning());
yield $promise;
});
yield $context->start();
yield new Delayed(100);
$promise = $context->join();
$context->kill();
$this->assertFalse($context->isRunning());
yield $promise;
}
/**
* @expectedException \Amp\Parallel\Context\ContextException
* @expectedExceptionMessage Failed to receive result
*/
public function testKillBusyContext()
{
Loop::run(function () {
$context = $this->createContext([
$this->expectException(ContextException::class);
$this->expectExceptionMessage('Failed to receive result');
$context = $this->createContext([
__DIR__ . "/Fixtures/sleep-process.php",
5,
]);
yield $context->start();
yield new Delayed(100);
$promise = $context->join();
$context->kill();
$this->assertFalse($context->isRunning());
yield $promise;
});
yield $context->start();
yield new Delayed(100);
$promise = $context->join();
$context->kill();
$this->assertFalse($context->isRunning());
yield $promise;
}
/**
* @expectedException \Amp\Parallel\Context\ContextException
* @expectedExceptionMessage Failed to receive result
*/
public function testExitingProcess()
{
Loop::run(function () {
$context = $this->createContext([
$this->expectException(ContextException::class);
$this->expectExceptionMessage('Failed to receive result');
$context = $this->createContext([
__DIR__ . "/Fixtures/exiting-process.php",
5,
]);
yield $context->start();
yield $context->join();
});
yield $context->start();
yield $context->join();
}
}

View File

@ -2,7 +2,6 @@
namespace Amp\Parallel\Test\Context;
use Amp\Loop;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\Parallel;
@ -18,41 +17,37 @@ class ParallelTest extends AbstractContextTest
public function testGetId()
{
Loop::run(function () {
$context = $this->createContext([
$context = $this->createContext([
__DIR__ . "/Fixtures/test-process.php",
"Test"
]);
yield $context->start();
$this->assertInternalType('int', $context->getId());
yield $context->join();
yield $context->start();
$this->assertIsInt($context->getId());
yield $context->join();
$context = $this->createContext([
$context = $this->createContext([
__DIR__ . "/Fixtures/test-process.php",
"Test"
]);
$this->expectException(\Error::class);
$this->expectExceptionMessage('The thread has not been started');
$this->expectException(\Error::class);
$this->expectExceptionMessage('The thread has not been started');
$context->getId();
});
$context->getId();
}
public function testRunStartsThread()
{
Loop::run(function () {
$thread = yield Parallel::run([
$thread = yield Parallel::run([
__DIR__ . "/Fixtures/test-process.php",
"Test"
]);
$this->assertInstanceOf(Parallel::class, $thread);
$this->assertTrue($thread->isRunning());
$this->assertInternalType('int', $thread->getId());
$this->assertInstanceOf(Parallel::class, $thread);
$this->assertTrue($thread->isRunning());
$this->assertIsInt($thread->getId());
return yield $thread->join();
});
return yield $thread->join();
}
}

View File

@ -3,16 +3,20 @@
namespace Amp\Parallel\Test\Context;
use Amp\Delayed;
use Amp\Loop;
use Amp\Parallel\Context\ContextException;
use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Context\Thread;
use Amp\Parallel\Sync\Channel;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ExitSuccess;
use Amp\PHPUnit\TestCase;
use Amp\Parallel\Sync\PanicError;
use Amp\Parallel\Sync\SynchronizationError;
use Amp\PHPUnit\AsyncTestCase;
/**
* @requires extension pthreads
*/
class ThreadTest extends TestCase
class ThreadTest extends AsyncTestCase
{
/**
* @param callable $function
@ -26,329 +30,289 @@ class ThreadTest extends TestCase
public function testIsRunning()
{
Loop::run(function () {
$context = $this->createContext(function () {
\usleep(100);
});
$this->assertFalse($context->isRunning());
yield $context->start();
$this->assertTrue($context->isRunning());
yield $context->join();
$this->assertFalse($context->isRunning());
$context = $this->createContext(function () {
\usleep(100);
});
$this->assertFalse($context->isRunning());
yield $context->start();
$this->assertTrue($context->isRunning());
yield $context->join();
$this->assertFalse($context->isRunning());
}
public function testKill()
{
Loop::run(function () {
$context = $this->createContext(function () {
\usleep(1e6);
});
$this->setTimeout(1000);
yield $context->start();
$this->assertRunTimeLessThan([$context, 'kill'], 1000);
$this->assertFalse($context->isRunning());
$context = $this->createContext(function () {
\usleep(1e6);
});
yield $context->start();
$context->kill();
$this->assertFalse($context->isRunning());
}
/**
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testStartWhileRunningThrowsError()
{
Loop::run(function () {
$context = $this->createContext(function () {
\usleep(100);
});
$this->expectException(StatusError::class);
yield $context->start();
yield $context->start();
$context = $this->createContext(function () {
\usleep(100);
});
yield $context->start();
yield $context->start();
}
/**
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testStartMultipleTimesThrowsError()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = $this->createContext(function () {
\sleep(1);
});
$this->expectException(StatusError::class);
yield $context->start();
yield $context->join();
$this->setMinimumRuntime(2000);
yield $context->start();
yield $context->join();
});
}, 2000);
$context = $this->createContext(function () {
\sleep(1);
});
yield $context->start();
yield $context->join();
yield $context->start();
yield $context->join();
}
/**
* @expectedException \Amp\Parallel\Sync\PanicError
*/
public function testExceptionInContextPanics()
{
Loop::run(function () {
$context = $this->createContext(function () {
throw new \Exception('Exception in fork.');
});
$this->expectException(PanicError::class);
yield $context->start();
yield $context->join();
$context = $this->createContext(function () {
throw new \Exception('Exception in fork.');
});
yield $context->start();
yield $context->join();
}
/**
* @expectedException \Amp\Parallel\Sync\PanicError
*/
public function testReturnUnserializableDataPanics()
{
Loop::run(function () {
$context = $this->createContext(function () {
return yield function () {};
});
$this->expectException(PanicError::class);
yield $context->start();
yield $context->join();
$context = $this->createContext(function () {
return yield function () {};
});
yield $context->start();
yield $context->join();
}
public function testJoinWaitsForChild()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = $this->createContext(function () {
\sleep(1);
});
$this->setMinimumRuntime(1000);
yield $context->start();
yield $context->join();
});
}, 1000);
$context = $this->createContext(function () {
\sleep(1);
return 1;
});
yield $context->start();
$this->assertSame(1, yield $context->join());
}
/**
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testJoinWithoutStartThrowsError()
{
Loop::run(function () {
$context = $this->createContext(function () {
\usleep(100);
});
$this->expectException(StatusError::class);
yield $context->join();
$context = $this->createContext(function () {
\usleep(100);
});
yield $context->join();
}
public function testJoinResolvesWithContextReturn()
{
Loop::run(function () {
$context = $this->createContext(function () {
return 42;
});
yield $context->start();
$this->assertSame(42, yield $context->join());
$context = $this->createContext(function () {
return 42;
});
yield $context->start();
$this->assertSame(42, yield $context->join());
}
public function testSendAndReceive()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(1);
$value = yield $channel->receive();
return $value;
});
$value = 42;
yield $context->start();
$this->assertSame(1, yield $context->receive());
yield $context->send($value);
$this->assertSame($value, yield $context->join());
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(1);
$value = yield $channel->receive();
return $value;
});
$value = 42;
yield $context->start();
$this->assertSame(1, yield $context->receive());
yield $context->send($value);
$this->assertSame($value, yield $context->join());
}
/**
* @depends testSendAndReceive
* @expectedException \Amp\Parallel\Sync\SynchronizationError
*/
public function testJoinWhenContextSendingData()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
return 42;
});
$this->expectException(SynchronizationError::class);
yield $context->start();
$value = yield $context->join();
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
return 42;
});
yield $context->start();
$value = yield $context->join();
}
/**
* @depends testSendAndReceive
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testReceiveBeforeContextHasStarted()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
return 42;
});
$this->expectException(StatusError::class);
$value = yield $context->receive();
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
return 42;
});
$value = yield $context->receive();
}
/**
* @depends testSendAndReceive
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testSendBeforeContextHasStarted()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
return 42;
});
$this->expectException(StatusError::class);
yield $context->send(0);
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
return 42;
});
yield $context->send(0);
}
/**
* @depends testSendAndReceive
* @expectedException \Amp\Parallel\Sync\SynchronizationError
*/
public function testReceiveWhenContextHasReturned()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
return 42;
});
$this->expectException(SynchronizationError::class);
yield $context->start();
$value = yield $context->receive();
$value = yield $context->receive();
$value = yield $context->join();
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
return 42;
});
yield $context->start();
$value = yield $context->receive();
$value = yield $context->receive();
$value = yield $context->join();
}
/**
* @depends testSendAndReceive
* @expectedException \Error
*/
public function testSendExitResult()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
$value = yield $channel->receive();
return 42;
});
$this->expectException(\Error::class);
yield $context->start();
yield $context->send(new ExitSuccess(0));
$value = yield $context->join();
$context = $this->createContext(function (Channel $channel) {
$value = yield $channel->receive();
return 42;
});
yield $context->start();
yield $context->send(new ExitSuccess(0));
$value = yield $context->join();
}
/**
* @expectedException \Amp\Parallel\Context\ContextException
* @expectedExceptionMessage Failed to receive result
*/
public function testExitingContextOnJoin()
{
Loop::run(function () {
$context = $this->createContext(function () {
exit;
});
$this->expectException(ContextException::class);
$this->expectExceptionMessage('Failed to receive result');
yield $context->start();
$value = yield $context->join();
$context = $this->createContext(function () {
exit;
});
yield $context->start();
$value = yield $context->join();
}
/**
* @expectedException \Amp\Parallel\Sync\ChannelException
* @expectedExceptionMessage The channel closed unexpectedly
*/
public function testExitingContextOnReceive()
{
Loop::run(function () {
$context = $this->createContext(function () {
exit;
});
$this->expectException(ChannelException::class);
$this->expectExceptionMessage('The channel closed unexpectedly');
yield $context->start();
$value = yield $context->receive();
$context = $this->createContext(function () {
exit;
});
yield $context->start();
$value = yield $context->receive();
}
/**
* @expectedException \Amp\Parallel\Sync\ChannelException
* @expectedExceptionMessage Sending on the channel failed
*/
public function testExitingContextOnSend()
{
Loop::run(function () {
$context = $this->createContext(function () {
yield new Delayed(1000);
exit;
});
$this->expectException(ChannelException::class);
$this->expectExceptionMessage('Sending on the channel failed');
yield $context->start();
yield $context->send(\str_pad("", 1024 * 1024, "-"));
$context = $this->createContext(function () {
yield new Delayed(1000);
exit;
});
yield $context->start();
yield $context->send(\str_pad("", 1024 * 1024, "-"));
}
public function testGetId()
{
Loop::run(function () {
$context = $this->createContext(function () {
yield new Delayed(100);
});
yield $context->start();
$this->assertInternalType('int', $context->getId());
yield $context->join();
$context = $this->createContext(function () {
yield new Delayed(100);
});
$this->expectException(\Error::class);
$this->expectExceptionMessage('The thread has not been started');
$context->getId();
$context = $this->createContext(function () {
yield new Delayed(100);
});
yield $context->start();
$this->assertIsInt($context->getId());
yield $context->join();
$context = $this->createContext(function () {
yield new Delayed(100);
});
$this->expectException(\Error::class);
$this->expectExceptionMessage('The thread has not been started');
$context->getId();
}
public function testRunStartsThread()
{
Loop::run(function () {
$thread = yield Thread::run(function () {
\usleep(100);
});
$this->assertInstanceOf(Thread::class, $thread);
$this->assertTrue($thread->isRunning());
return yield $thread->join();
$thread = yield Thread::run(function () {
\usleep(100);
});
$this->assertInstanceOf(Thread::class, $thread);
$this->assertTrue($thread->isRunning());
return yield $thread->join();
}
}

View File

@ -2,27 +2,24 @@
namespace Amp\Parallel\Test\Sync;
use Amp\PHPUnit\TestCase;
use Amp\Promise;
use Amp\Parallel\Sync\Parcel;
use Amp\PHPUnit\AsyncTestCase;
abstract class AbstractParcelTest extends TestCase
abstract class AbstractParcelTest extends AsyncTestCase
{
/**
* @return \Amp\Parallel\Sync\Parcel
*/
abstract protected function createParcel($value);
abstract protected function createParcel($value): Parcel;
public function testUnwrapIsOfCorrectType()
{
$object = $this->createParcel(new \stdClass);
$this->assertInstanceOf('stdClass', Promise\wait($object->unwrap()));
$this->assertInstanceOf('stdClass', yield $object->unwrap());
}
public function testUnwrapIsEqual()
{
$object = new \stdClass;
$shared = $this->createParcel($object);
$this->assertEquals($object, Promise\wait($shared->unwrap()));
$this->assertEquals($object, yield $shared->unwrap());
}
/**

View File

@ -2,40 +2,39 @@
namespace Amp\Parallel\Test\Sync;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ChannelParser;
use Amp\PHPUnit\TestCase;
use Amp\Parallel\Sync\SerializationException;
use Amp\PHPUnit\AsyncTestCase;
class ChannelParserTest extends TestCase
class ChannelParserTest extends AsyncTestCase
{
/**
* @expectedException \Amp\Parallel\Sync\SerializationException
* @expectedExceptionMessage Exception thrown when unserializing data
*/
public function testCorruptedData()
{
$this->expectException(SerializationException::class);
$this->expectExceptionMessage('Exception thrown when unserializing data');
$data = "Invalid serialized data";
$data = \pack("CL", 0, \strlen($data)) . $data;
$parser = new ChannelParser($this->createCallback(0));
$parser->push($data);
}
/**
* @expectedException \Amp\Parallel\Sync\ChannelException
* @expectedExceptionMessage Invalid packet received: Invalid packet
*/
public function testInvalidHeaderData()
{
$this->expectException(ChannelException::class);
$this->expectExceptionMessage('Invalid packet received: Invalid packet');
$data = "Invalid packet";
$parser = new ChannelParser($this->createCallback(0));
$parser->push($data);
}
/**
* @expectedException \Amp\Parallel\Sync\ChannelException
* @expectedExceptionMessage Invalid packet received: B \xf3\xf2\x0\x1
*/
public function testInvalidHeaderBinaryData()
{
$this->expectException(ChannelException::class);
$this->expectExceptionMessage('Invalid packet received: B \xf3\xf2\x0\x1');
$data = "\x42\x20\xf3\xf2\x00\x01";
$parser = new ChannelParser($this->createCallback(0));
$parser->push($data);

View File

@ -2,11 +2,12 @@
namespace Amp\Parallel\Test\Sync;
use Amp\Loop;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\PHPUnit\TestCase;
use Amp\Parallel\Sync\SerializationException;
use Amp\PHPUnit\AsyncTestCase;
class ChannelledSocketTest extends TestCase
class ChannelledSocketTest extends AsyncTestCase
{
/**
* @return resource[]
@ -25,17 +26,15 @@ class ChannelledSocketTest extends TestCase
public function testSendReceive()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$b = new ChannelledSocket($right, $right);
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$b = new ChannelledSocket($right, $right);
$message = 'hello';
$message = 'hello';
yield $a->send($message);
$data = yield $b->receive();
$this->assertSame($message, $data);
});
yield $a->send($message);
$data = yield $b->receive();
$this->assertSame($message, $data);
}
/**
@ -43,83 +42,77 @@ class ChannelledSocketTest extends TestCase
*/
public function testSendReceiveLongData()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$b = new ChannelledSocket($right, $right);
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$b = new ChannelledSocket($right, $right);
$length = 0xffff;
$message = '';
for ($i = 0; $i < $length; ++$i) {
$message .= \chr(\mt_rand(0, 255));
}
$length = 0xffff;
$message = '';
for ($i = 0; $i < $length; ++$i) {
$message .= \chr(\mt_rand(0, 255));
}
$a->send($message);
$data = yield $b->receive();
$this->assertSame($message, $data);
});
$a->send($message);
$data = yield $b->receive();
$this->assertSame($message, $data);
}
/**
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testInvalidDataReceived()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$b = new ChannelledSocket($right, $right);
$this->expectException(ChannelException::class);
\fwrite($left, \pack('L', 10) . '1234567890');
$data = yield $b->receive();
});
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$b = new ChannelledSocket($right, $right);
\fwrite($left, \pack('L', 10) . '1234567890');
$data = yield $b->receive();
}
/**
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\SerializationException
*/
public function testSendUnserializableData()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$b = new ChannelledSocket($right, $right);
$this->expectException(SerializationException::class);
// Close $a. $b should close on next read...
yield $a->send(function () {});
$data = yield $b->receive();
});
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$b = new ChannelledSocket($right, $right);
// Close $a. $b should close on next read...
yield $a->send(function () {});
$data = yield $b->receive();
}
/**
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testSendAfterClose()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$a->close();
$this->expectException(ChannelException::class);
yield $a->send('hello');
});
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$a->close();
yield $a->send('hello');
}
/**
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testReceiveAfterClose()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$a->close();
$this->expectException(ChannelException::class);
$data = yield $a->receive();
});
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$a->close();
$data = yield $a->receive();
}
}

View File

@ -5,13 +5,14 @@ namespace Amp\Parallel\Test\Sync;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\StreamException;
use Amp\Loop;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ChannelledStream;
use Amp\PHPUnit\TestCase;
use Amp\Parallel\Sync\SerializationException;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Promise;
use Amp\Success;
class ChannelledStreamTest extends TestCase
class ChannelledStreamTest extends AsyncTestCase
{
/**
* @return \Amp\ByteStream\InputStream|\Amp\ByteStream\OutputStream
@ -48,17 +49,15 @@ class ChannelledStreamTest extends TestCase
public function testSendReceive()
{
Loop::run(function () {
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
$message = 'hello';
$message = 'hello';
yield $a->send($message);
$data = yield $b->receive();
$this->assertSame($message, $data);
});
yield $a->send($message);
$data = yield $b->receive();
$this->assertSame($message, $data);
}
/**
@ -66,94 +65,88 @@ class ChannelledStreamTest extends TestCase
*/
public function testSendReceiveLongData()
{
Loop::run(function () {
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
$length = 0xffff;
$message = '';
for ($i = 0; $i < $length; ++$i) {
$message .= \chr(\mt_rand(0, 255));
}
$length = 0xffff;
$message = '';
for ($i = 0; $i < $length; ++$i) {
$message .= \chr(\mt_rand(0, 255));
}
yield $a->send($message);
$data = yield $b->receive();
$this->assertSame($message, $data);
});
yield $a->send($message);
$data = yield $b->receive();
$this->assertSame($message, $data);
}
/**
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testInvalidDataReceived()
{
Loop::run(function () {
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
$this->expectException(ChannelException::class);
// Close $a. $b should close on next read...
yield $mock->write(\pack('L', 10) . '1234567890');
$data = yield $b->receive();
});
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
// Close $a. $b should close on next read...
yield $mock->write(\pack('L', 10) . '1234567890');
$data = yield $b->receive();
}
/**
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\SerializationException
*/
public function testSendUnserializableData()
{
Loop::run(function () {
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
$this->expectException(SerializationException::class);
// Close $a. $b should close on next read...
yield $a->send(function () {});
$data = yield $b->receive();
});
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
// Close $a. $b should close on next read...
yield $a->send(function () {});
$data = yield $b->receive();
}
/**
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testSendAfterClose()
{
Loop::run(function () {
$mock = $this->createMock(OutputStream::class);
$mock->expects($this->once())
$this->expectException(ChannelException::class);
$mock = $this->createMock(OutputStream::class);
$mock->expects($this->once())
->method('write')
->will($this->throwException(new StreamException));
$a = new ChannelledStream($this->createMock(InputStream::class), $mock);
$b = new ChannelledStream(
$a = new ChannelledStream($this->createMock(InputStream::class), $mock);
$b = new ChannelledStream(
$this->createMock(InputStream::class),
$this->createMock(OutputStream::class)
);
yield $a->send('hello');
});
yield $a->send('hello');
}
/**
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testReceiveAfterClose()
{
Loop::run(function () {
$mock = $this->createMock(InputStream::class);
$mock->expects($this->once())
$this->expectException(ChannelException::class);
$mock = $this->createMock(InputStream::class);
$mock->expects($this->once())
->method('read')
->willReturn(new Success(null));
$a = new ChannelledStream($mock, $this->createMock(OutputStream::class));
$a = new ChannelledStream($mock, $this->createMock(OutputStream::class));
$data = yield $a->receive();
});
$data = yield $a->receive();
}
}

View File

@ -4,9 +4,9 @@ namespace Amp\Parallel\Test\Sync;
use Amp\Parallel\Sync\ExitFailure;
use Amp\Parallel\Sync\PanicError;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class ExitFailureTest extends TestCase
class ExitFailureTest extends AsyncTestCase
{
public function testGetResult()
{

View File

@ -3,9 +3,9 @@
namespace Amp\Parallel\Test\Sync;
use Amp\Parallel\Sync\ExitSuccess;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class ExitSuccessTest extends TestCase
class ExitSuccessTest extends AsyncTestCase
{
public function testGetResult()
{

View File

@ -0,0 +1,19 @@
<?php
namespace Amp\Parallel\Test\Sync\Fixture;
use Amp\Parallel\Sync\SharedMemoryParcel;
return function () use ($argv): \Generator {
if (!isset($argv[1])) {
throw new \Error('No parcel ID given');
}
$parcel = SharedMemoryParcel::use($argv[1]);
yield $parcel->synchronized(function (int $value): int {
return $value + 1;
});
return yield $parcel->unwrap();
};

View File

@ -2,9 +2,10 @@
namespace Amp\Parallel\Test\Sync;
use Amp\Loop;
use Amp\Delayed;
use Amp\Parallel\Context\Process;
use Amp\Parallel\Sync\Parcel;
use Amp\Parallel\Sync\SharedMemoryParcel;
use Amp\Promise;
/**
* @requires extension shmop
@ -16,13 +17,13 @@ class SharedMemoryParcelTest extends AbstractParcelTest
private $parcel;
protected function createParcel($value)
protected function createParcel($value): Parcel
{
$this->parcel = SharedMemoryParcel::create(self::ID, $value);
return $this->parcel;
}
public function tearDown()
public function tearDown(): void
{
$this->parcel = null;
}
@ -30,12 +31,11 @@ class SharedMemoryParcelTest extends AbstractParcelTest
public function testObjectOverflowMoved()
{
$object = SharedMemoryParcel::create(self::ID, 'hi', 2);
$awaitable = $object->synchronized(function () {
yield $object->synchronized(function () {
return 'hello world';
});
Promise\wait($awaitable);
$this->assertEquals('hello world', Promise\wait($object->unwrap()));
$this->assertEquals('hello world', yield $object->unwrap());
}
/**
@ -46,36 +46,19 @@ class SharedMemoryParcelTest extends AbstractParcelTest
{
$object = SharedMemoryParcel::create(self::ID, 42);
$this->doInFork(function () use ($object) {
$awaitable = $object->synchronized(function ($value) {
return $value + 1;
});
Promise\wait($awaitable);
$process = new Process([__DIR__ . '/Fixture/parcel.php', self::ID]);
$promise = $object->synchronized(function (int $value): \Generator {
$this->assertSame(42, $value);
yield new Delayed(500); // Child must wait until parent finishes with parcel.
return $value + 1;
});
$this->assertEquals(43, Promise\wait($object->unwrap()));
}
yield $process->start();
/**
* @group posix
* @requires extension pcntl
*/
public function testInSeparateProcess()
{
$parcel = SharedMemoryParcel::create(self::ID, 42);
$this->assertSame(43, yield $promise);
$this->doInFork(function () {
Loop::run(function () {
$parcel = SharedMemoryParcel::use(self::ID);
$this->assertSame(43, yield $parcel->synchronized(function ($value) {
$this->assertSame(42, $value);
return $value + 1;
}));
});
});
Loop::run(function () use ($parcel) {
$this->assertSame(43, yield $parcel->unwrap());
});
$this->assertSame(44, yield $process->join()); // Wait for child process to finish.
$this->assertEquals(44, yield $object->unwrap());
}
}

View File

@ -2,9 +2,9 @@
namespace Amp\Parallel\Test\Sync;
use Amp\Loop;
use Amp\Parallel\Context\Thread;
use Amp\Parallel\Sync\Channel;
use Amp\Parallel\Sync\Parcel;
use Amp\Parallel\Sync\ThreadedParcel;
/**
@ -12,26 +12,24 @@ use Amp\Parallel\Sync\ThreadedParcel;
*/
class ThreadedParcelTest extends AbstractParcelTest
{
protected function createParcel($value)
protected function createParcel($value): Parcel
{
return new ThreadedParcel($value);
}
public function testWithinThread()
{
Loop::run(function () {
$value = 1;
$parcel = new ThreadedParcel($value);
$value = 1;
$parcel = new ThreadedParcel($value);
$thread = yield Thread::run(function (Channel $channel, ThreadedParcel $parcel) {
$parcel->synchronized(function (int $value) {
return $value + 1;
});
return 0;
}, $parcel);
$thread = yield Thread::run(function (Channel $channel, ThreadedParcel $parcel) {
$parcel->synchronized(function (int $value) {
return $value + 1;
});
return 0;
}, $parcel);
$this->assertSame(0, yield $thread->join());
$this->assertSame($value + 1, yield $parcel->unwrap());
});
$this->assertSame(0, yield $thread->join());
$this->assertSame($value + 1, yield $parcel->unwrap());
}
}

View File

@ -2,14 +2,14 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Loop;
use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Worker\Pool;
use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\Worker;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Promise;
abstract class AbstractPoolTest extends TestCase
abstract class AbstractPoolTest extends AsyncTestCase
{
/**
* @param int $min
@ -21,54 +21,45 @@ abstract class AbstractPoolTest extends TestCase
public function testIsRunning()
{
Loop::run(function () {
$pool = $this->createPool();
$pool = $this->createPool();
$this->assertTrue($pool->isRunning());
$this->assertTrue($pool->isRunning());
yield $pool->shutdown();
$this->assertFalse($pool->isRunning());
});
yield $pool->shutdown();
$this->assertFalse($pool->isRunning());
}
public function testIsIdleOnStart()
{
Loop::run(function () {
$pool = $this->createPool();
$pool = $this->createPool();
$this->assertTrue($pool->isIdle());
$this->assertTrue($pool->isIdle());
yield $pool->shutdown();
});
yield $pool->shutdown();
}
public function testShutdownShouldReturnSameResult()
{
Loop::run(function () {
$pool = $this->createPool();
$pool = $this->createPool();
$this->assertTrue($pool->isIdle());
$this->assertTrue($pool->isIdle());
$result = yield $pool->shutdown();
$this->assertSame($result, yield $pool->shutdown());
});
$result = yield $pool->shutdown();
$this->assertSame($result, yield $pool->shutdown());
}
/**
* @expectedException \Amp\Parallel\Context\StatusError
* @expectedExceptionMessage The pool was shutdown
*/
public function testPullShouldThrowStatusError()
{
Loop::run(function () {
$pool = $this->createPool();
$this->expectException(StatusError::class);
$this->expectExceptionMessage('The pool was shutdown');
$this->assertTrue($pool->isIdle());
$pool = $this->createPool();
yield $pool->shutdown();
$this->assertTrue($pool->isIdle());
$pool->getWorker();
});
yield $pool->shutdown();
$pool->getWorker();
}
public function testGetMaxSize()
@ -79,77 +70,105 @@ abstract class AbstractPoolTest extends TestCase
public function testWorkersIdleOnStart()
{
Loop::run(function () {
$pool = $this->createPool();
$pool = $this->createPool();
$this->assertEquals(0, $pool->getIdleWorkerCount());
$this->assertEquals(0, $pool->getIdleWorkerCount());
yield $pool->shutdown();
});
yield $pool->shutdown();
}
public function testEnqueue()
{
Loop::run(function () {
$pool = $this->createPool();
$pool = $this->createPool();
$returnValue = yield $pool->enqueue(new Fixtures\TestTask(42));
$this->assertEquals(42, $returnValue);
$returnValue = yield $pool->enqueue(new Fixtures\TestTask(42));
$this->assertEquals(42, $returnValue);
yield $pool->shutdown();
});
yield $pool->shutdown();
}
public function testEnqueueMultiple()
{
Loop::run(function () {
$pool = $this->createPool();
$pool = $this->createPool();
$values = yield \Amp\Promise\all([
$values = yield \Amp\Promise\all([
$pool->enqueue(new Fixtures\TestTask(42)),
$pool->enqueue(new Fixtures\TestTask(56)),
$pool->enqueue(new Fixtures\TestTask(72))
]);
$this->assertEquals([42, 56, 72], $values);
$this->assertEquals([42, 56, 72], $values);
yield $pool->shutdown();
});
yield $pool->shutdown();
}
public function testKill()
{
$this->setTimeout(1000);
$pool = $this->createPool();
$this->assertRunTimeLessThan([$pool, 'kill'], 1000);
$pool->kill();
$this->assertFalse($pool->isRunning());
}
public function testGet()
{
Loop::run(function () {
$pool = $this->createPool();
$pool = $this->createPool();
$worker = $pool->getWorker();
$this->assertInstanceOf(Worker::class, $worker);
$worker = $pool->getWorker();
$this->assertInstanceOf(Worker::class, $worker);
$this->assertTrue($worker->isRunning());
$this->assertTrue($worker->isIdle());
$this->assertTrue($worker->isRunning());
$this->assertTrue($worker->isIdle());
$this->assertSame(42, yield $worker->enqueue(new Fixtures\TestTask(42)));
$this->assertSame(42, yield $worker->enqueue(new Fixtures\TestTask(42)));
yield $worker->shutdown();
yield $worker->shutdown();
$worker->kill();
});
$worker->kill();
}
public function testBusyPool()
{
Loop::run(function () {
$pool = $this->createPool(2);
$pool = $this->createPool(2);
$values = [42, 56, 72];
$values = [42, 56, 72];
$tasks = \array_map(function (int $value): Task {
return new Fixtures\TestTask($value);
}, $values);
$promises = \array_map(function (Task $task) use ($pool): Promise {
return $pool->enqueue($task);
}, $tasks);
$this->assertSame($values, yield $promises);
$promises = \array_map(function (Task $task) use ($pool): Promise {
return $pool->enqueue($task);
}, $tasks);
$this->assertSame($values, yield $promises);
yield $pool->shutdown();
}
public function testCreatePoolShouldThrowError()
{
$this->expectException(\Error::class);
$this->expectExceptionMessage('Maximum size must be a non-negative integer');
$this->createPool(-1);
}
public function testCleanGarbageCollection()
{
// See https://github.com/amphp/parallel-functions/issues/5
for ($i = 0; $i < 3; $i++) {
$pool = $this->createPool(32);
$values = \range(1, 50);
$tasks = \array_map(function (int $value): Task {
return new Fixtures\TestTask($value);
}, $values);
@ -159,59 +178,17 @@ abstract class AbstractPoolTest extends TestCase
}, $tasks);
$this->assertSame($values, yield $promises);
$promises = \array_map(function (Task $task) use ($pool): Promise {
return $pool->enqueue($task);
}, $tasks);
$this->assertSame($values, yield $promises);
yield $pool->shutdown();
});
}
/**
* @expectedException \Error
* @expectedExceptionMessage Maximum size must be a non-negative integer
*/
public function testCreatePoolShouldThrowError()
{
Loop::run(function () {
$this->createPool(-1);
});
}
public function testCleanGarbageCollection()
{
// See https://github.com/amphp/parallel-functions/issues/5
Loop::run(function () {
for ($i = 0; $i < 3; $i++) {
$pool = $this->createPool(32);
$values = \range(1, 50);
$tasks = \array_map(function (int $value): Task {
return new Fixtures\TestTask($value);
}, $values);
$promises = \array_map(function (Task $task) use ($pool): Promise {
return $pool->enqueue($task);
}, $tasks);
$this->assertSame($values, yield $promises);
}
});
}
}
public function testPooledKill()
{
// See https://github.com/amphp/parallel/issues/66
Loop::run(function () {
$pool = $this->createPool(1);
$worker = $pool->getWorker();
$worker->kill();
$worker2 = $pool->getWorker();
unset($worker); // Invoke destructor.
$this->assertTrue($worker2->isRunning());
});
$pool = $this->createPool(1);
$worker = $pool->getWorker();
$worker->kill();
$worker2 = $pool->getWorker();
unset($worker); // Invoke destructor.
$this->assertTrue($worker2->isRunning());
}
}

View File

@ -2,7 +2,7 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Loop;
use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Sync\PanicError;
use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\Worker\BasicEnvironment;
@ -11,7 +11,7 @@ use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\TaskError;
use Amp\Parallel\Worker\TaskException;
use Amp\Parallel\Worker\WorkerException;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class NonAutoloadableTask implements Task
{
@ -21,7 +21,7 @@ class NonAutoloadableTask implements Task
}
}
abstract class AbstractWorkerTest extends TestCase
abstract class AbstractWorkerTest extends AsyncTestCase
{
/**
* @param string $envClassName
@ -33,298 +33,265 @@ abstract class AbstractWorkerTest extends TestCase
public function testWorkerConstantDefined()
{
Loop::run(function () {
$worker = $this->createWorker();
$this->assertTrue(yield $worker->enqueue(new Fixtures\ConstantTask));
yield $worker->shutdown();
});
$worker = $this->createWorker();
$this->assertTrue(yield $worker->enqueue(new Fixtures\ConstantTask));
yield $worker->shutdown();
}
public function testIsRunning()
{
Loop::run(function () {
$worker = $this->createWorker();
$this->assertTrue($worker->isRunning());
$worker = $this->createWorker();
$this->assertTrue($worker->isRunning());
$worker->enqueue(new Fixtures\TestTask(42)); // Enqueue a task to start the worker.
$worker->enqueue(new Fixtures\TestTask(42)); // Enqueue a task to start the worker.
$this->assertTrue($worker->isRunning());
$this->assertTrue($worker->isRunning());
yield $worker->shutdown();
$this->assertFalse($worker->isRunning());
});
yield $worker->shutdown();
$this->assertFalse($worker->isRunning());
}
public function testIsIdleOnStart()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
$this->assertTrue($worker->isIdle());
$this->assertTrue($worker->isIdle());
yield $worker->shutdown();
});
yield $worker->shutdown();
}
/**
* @expectedException \Amp\Parallel\Context\StatusError
* @expectedExceptionMessage The worker has been shut down
*/
public function testEnqueueShouldThrowStatusError()
{
Loop::run(function () {
$worker = $this->createWorker();
$this->expectException(StatusError::class);
$this->expectExceptionMessage('The worker has been shut down');
$this->assertTrue($worker->isIdle());
$worker = $this->createWorker();
yield $worker->shutdown();
yield $worker->enqueue(new Fixtures\TestTask(42));
});
$this->assertTrue($worker->isIdle());
yield $worker->shutdown();
yield $worker->enqueue(new Fixtures\TestTask(42));
}
public function testEnqueue()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
$returnValue = yield $worker->enqueue(new Fixtures\TestTask(42));
$this->assertEquals(42, $returnValue);
$returnValue = yield $worker->enqueue(new Fixtures\TestTask(42));
$this->assertEquals(42, $returnValue);
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testEnqueueMultipleSynchronous()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
$values = yield \Amp\Promise\all([
$values = yield \Amp\Promise\all([
$worker->enqueue(new Fixtures\TestTask(42)),
$worker->enqueue(new Fixtures\TestTask(56)),
$worker->enqueue(new Fixtures\TestTask(72))
]);
$this->assertEquals([42, 56, 72], $values);
$this->assertEquals([42, 56, 72], $values);
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testEnqueueMultipleAsynchronous()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
$promises = [
$promises = [
$worker->enqueue(new Fixtures\TestTask(42, 200)),
$worker->enqueue(new Fixtures\TestTask(56, 300)),
$worker->enqueue(new Fixtures\TestTask(72, 100))
];
$expected = [42, 56, 72];
foreach ($promises as $promise) {
$promise->onResolve(function ($e, $v) use (&$expected) {
$this->assertSame(\array_shift($expected), $v);
});
}
$expected = [42, 56, 72];
foreach ($promises as $promise) {
$promise->onResolve(function ($e, $v) use (&$expected) {
$this->assertSame(\array_shift($expected), $v);
});
}
yield $promises; // Wait until all tasks have finished before invoking $worker->shutdown().
yield $promises; // Wait until all tasks have finished before invoking $worker->shutdown().
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testEnqueueMultipleThenShutdown()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
$promises = [
$promises = [
$worker->enqueue(new Fixtures\TestTask(42, 200)),
$worker->enqueue(new Fixtures\TestTask(56, 300)),
$worker->enqueue(new Fixtures\TestTask(72, 100))
];
yield $worker->shutdown();
yield $worker->shutdown();
\array_shift($promises); // First task will succeed.
\array_shift($promises); // First task will succeed.
foreach ($promises as $promise) {
$promise->onResolve(function ($e, $v) {
$this->assertInstanceOf(WorkerException::class, $e);
});
}
});
foreach ($promises as $promise) {
$promise->onResolve(function ($e, $v) {
$this->assertInstanceOf(WorkerException::class, $e);
});
}
}
public function testNotIdleOnEnqueue()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
$coroutine = $worker->enqueue(new Fixtures\TestTask(42));
$this->assertFalse($worker->isIdle());
yield $coroutine;
$coroutine = $worker->enqueue(new Fixtures\TestTask(42));
$this->assertFalse($worker->isIdle());
yield $coroutine;
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testKill()
{
$this->setTimeout(250);
$worker = $this->createWorker();
$worker->enqueue(new Fixtures\TestTask(42));
$this->assertRunTimeLessThan([$worker, 'kill'], 250);
$worker->kill();
$this->assertFalse($worker->isRunning());
}
public function testFailingTaskWithException()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
try {
yield $worker->enqueue(new Fixtures\FailingTask(\Exception::class));
} catch (TaskException $exception) {
$this->assertSame(\Exception::class, $exception->getName());
}
try {
yield $worker->enqueue(new Fixtures\FailingTask(\Exception::class));
} catch (TaskException $exception) {
$this->assertSame(\Exception::class, $exception->getName());
}
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testFailingTaskWithError()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
try {
yield $worker->enqueue(new Fixtures\FailingTask(\Error::class));
} catch (TaskError $exception) {
$this->assertSame(\Error::class, $exception->getName());
}
try {
yield $worker->enqueue(new Fixtures\FailingTask(\Error::class));
} catch (TaskError $exception) {
$this->assertSame(\Error::class, $exception->getName());
}
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testFailingTaskWithPreviousException()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
try {
yield $worker->enqueue(new Fixtures\FailingTask(\Error::class, \Exception::class));
} catch (TaskError $exception) {
$this->assertSame(\Error::class, $exception->getName());
$previous = $exception->getPrevious();
$this->assertInstanceOf(TaskException::class, $previous);
$this->assertSame(\Exception::class, $previous->getName());
}
try {
yield $worker->enqueue(new Fixtures\FailingTask(\Error::class, \Exception::class));
} catch (TaskError $exception) {
$this->assertSame(\Error::class, $exception->getName());
$previous = $exception->getPrevious();
$this->assertInstanceOf(TaskException::class, $previous);
$this->assertSame(\Exception::class, $previous->getName());
}
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testNonAutoloadableTask()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
try {
yield $worker->enqueue(new NonAutoloadableTask);
$this->fail("Tasks that cannot be autoloaded should throw an exception");
} catch (TaskError $exception) {
$this->assertSame("Error", $exception->getName());
$this->assertGreaterThan(0, \strpos($exception->getMessage(), \sprintf("Classes implementing %s", Task::class)));
}
try {
yield $worker->enqueue(new NonAutoloadableTask);
$this->fail("Tasks that cannot be autoloaded should throw an exception");
} catch (TaskError $exception) {
$this->assertSame("Error", $exception->getName());
$this->assertGreaterThan(0, \strpos($exception->getMessage(), \sprintf("Classes implementing %s", Task::class)));
}
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testUnserializableTask()
{
Loop::run(function () {
$worker = $this->createWorker();
$worker = $this->createWorker();
try {
yield $worker->enqueue(new class implements Task { // Anonymous classes are not serializable.
public function run(Environment $environment)
{
}
});
$this->fail("Tasks that cannot be serialized should throw an exception");
} catch (SerializationException $exception) {
$this->assertSame(0, \strpos($exception->getMessage(), "The given data cannot be sent because it is not serializable"));
}
yield $worker->shutdown();
});
}
public function testUnserializableResult()
{
Loop::run(function () {
$worker = $this->createWorker();
try {
yield $worker->enqueue(new Fixtures\UnserializableResultTask);
$this->fail("Tasks results that cannot be serialized should throw an exception");
} catch (TaskException $exception) {
$this->assertSame(0, \strpos($exception->getMessage(), "Uncaught Amp\Parallel\Sync\SerializationException in worker"));
}
yield $worker->shutdown();
});
}
public function testNonAutoloadableResult()
{
Loop::run(function () {
$worker = $this->createWorker();
try {
yield $worker->enqueue(new Fixtures\NonAutoloadableResultTask);
$this->fail("Tasks results that cannot be autoloaded should throw an exception");
} catch (\Error $exception) {
$this->assertSame(0, \strpos($exception->getMessage(), "Class instances returned from Amp\Parallel\Worker\Task::run() must be autoloadable by the Composer autoloader"));
}
yield $worker->shutdown();
});
}
public function testUnserializableTaskFollowedByValidTask()
{
Loop::run(function () {
$worker = $this->createWorker();
$promise1 = $worker->enqueue(new class implements Task { // Anonymous classes are not serializable.
try {
yield $worker->enqueue(new class implements Task { // Anonymous classes are not serializable.
public function run(Environment $environment)
{
}
});
$promise2 = $worker->enqueue(new Fixtures\TestTask(42));
$this->fail("Tasks that cannot be serialized should throw an exception");
} catch (SerializationException $exception) {
$this->assertSame(0, \strpos($exception->getMessage(), "The given data cannot be sent because it is not serializable"));
}
$this->assertSame(42, yield $promise2);
yield $worker->shutdown();
}
yield $worker->shutdown();
public function testUnserializableResult()
{
$worker = $this->createWorker();
try {
yield $worker->enqueue(new Fixtures\UnserializableResultTask);
$this->fail("Tasks results that cannot be serialized should throw an exception");
} catch (TaskException $exception) {
$this->assertSame(0, \strpos($exception->getMessage(), "Uncaught Amp\Parallel\Sync\SerializationException in worker"));
}
yield $worker->shutdown();
}
public function testNonAutoloadableResult()
{
$worker = $this->createWorker();
try {
yield $worker->enqueue(new Fixtures\NonAutoloadableResultTask);
$this->fail("Tasks results that cannot be autoloaded should throw an exception");
} catch (\Error $exception) {
$this->assertSame(0, \strpos($exception->getMessage(), "Class instances returned from Amp\Parallel\Worker\Task::run() must be autoloadable by the Composer autoloader"));
}
yield $worker->shutdown();
}
public function testUnserializableTaskFollowedByValidTask()
{
$worker = $this->createWorker();
$promise1 = $worker->enqueue(new class implements Task { // Anonymous classes are not serializable.
public function run(Environment $environment)
{
}
});
$promise2 = $worker->enqueue(new Fixtures\TestTask(42));
$this->assertSame(42, yield $promise2);
yield $worker->shutdown();
}
public function testCustomAutoloader()
{
Loop::run(function () {
$worker = $this->createWorker(BasicEnvironment::class, __DIR__ . '/Fixtures/custom-bootstrap.php');
$worker = $this->createWorker(BasicEnvironment::class, __DIR__ . '/Fixtures/custom-bootstrap.php');
$this->assertTrue(yield $worker->enqueue(new Fixtures\AutoloadTestTask));
$this->assertTrue(yield $worker->enqueue(new Fixtures\AutoloadTestTask));
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testInvalidCustomAutoloader()
@ -332,12 +299,10 @@ abstract class AbstractWorkerTest extends TestCase
$this->expectException(PanicError::class);
$this->expectExceptionMessage('No file found at bootstrap file path given');
Loop::run(function () {
$worker = $this->createWorker(BasicEnvironment::class, __DIR__ . '/Fixtures/not-found.php');
$worker = $this->createWorker(BasicEnvironment::class, __DIR__ . '/Fixtures/not-found.php');
$this->assertTrue(yield $worker->enqueue(new Fixtures\AutoloadTestTask));
$this->assertTrue(yield $worker->enqueue(new Fixtures\AutoloadTestTask));
yield $worker->shutdown();
});
yield $worker->shutdown();
}
}

View File

@ -3,11 +3,10 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Delayed;
use Amp\Loop;
use Amp\Parallel\Worker\BasicEnvironment;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class BasicEnvironmentTest extends TestCase
class BasicEnvironmentTest extends AsyncTestCase
{
public function testBasicOperations()
{
@ -37,12 +36,11 @@ class BasicEnvironmentTest extends TestCase
$this->assertNull($environment->set($key, null));
}
/**
* @expectedException \Error
* @expectedExceptionMessage The time-to-live must be a positive integer or null
*/
public function testSetShouleThrowError()
{
$this->expectException(\Error::class);
$this->expectExceptionMessage('The time-to-live must be a positive integer or null');
$environment = new BasicEnvironment;
$key = "key";
$environment->set($key, 1, 0);
@ -83,16 +81,14 @@ class BasicEnvironmentTest extends TestCase
public function testTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key = "key";
$environment = new BasicEnvironment;
$key = "key";
$environment->set($key, 1, 2);
$environment->set($key, 1, 2);
yield new Delayed(3000);
yield new Delayed(3000);
$this->assertFalse($environment->exists($key));
});
$this->assertFalse($environment->exists($key));
}
/**
@ -100,74 +96,66 @@ class BasicEnvironmentTest extends TestCase
*/
public function testRemovingTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key = "key";
$environment = new BasicEnvironment;
$key = "key";
$environment->set($key, 1, 1);
$environment->set($key, 1, 1);
$environment->set($key, 2);
$environment->set($key, 2);
yield new Delayed(2000);
yield new Delayed(2000);
$this->assertTrue($environment->exists($key));
$this->assertSame(2, $environment->get($key));
});
$this->assertTrue($environment->exists($key));
$this->assertSame(2, $environment->get($key));
}
public function testShorteningTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key = "key";
$environment = new BasicEnvironment;
$key = "key";
$environment->set($key, 1, 10);
$environment->set($key, 1, 1);
$environment->set($key, 1, 10);
$environment->set($key, 1, 1);
yield new Delayed(2000);
yield new Delayed(2000);
$this->assertFalse($environment->exists($key));
});
$this->assertFalse($environment->exists($key));
}
public function testLengtheningTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key = "key";
$environment = new BasicEnvironment;
$key = "key";
$environment->set($key, 1, 1);
$environment->set($key, 1, 3);
$environment->set($key, 1, 1);
$environment->set($key, 1, 3);
yield new Delayed(2000);
yield new Delayed(2000);
$this->assertTrue($environment->exists($key));
$this->assertTrue($environment->exists($key));
yield new Delayed(1100);
yield new Delayed(1100);
$this->assertFalse($environment->exists($key));
});
$this->assertFalse($environment->exists($key));
}
public function testAccessExtendsTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key1 = "key1";
$key2 = "key2";
$environment = new BasicEnvironment;
$key1 = "key1";
$key2 = "key2";
$environment->set($key1, 1, 2);
$environment->set($key2, 2, 2);
$environment->set($key1, 1, 2);
$environment->set($key2, 2, 2);
yield new Delayed(1000);
yield new Delayed(1000);
$this->assertSame(1, $environment->get($key1));
$this->assertTrue($environment->exists($key2));
$this->assertSame(1, $environment->get($key1));
$this->assertTrue($environment->exists($key2));
yield new Delayed(1500);
yield new Delayed(1500);
$this->assertTrue($environment->exists($key1));
$this->assertFalse($environment->exists($key2));
});
$this->assertTrue($environment->exists($key1));
$this->assertFalse($environment->exists($key2));
}
}

View File

@ -2,23 +2,20 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Loop;
use Amp\Parallel\Worker\BootstrapWorkerFactory;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class BootstrapWorkerFactoryTest extends TestCase
class BootstrapWorkerFactoryTest extends AsyncTestCase
{
public function testAutoloading()
{
$factory = new BootstrapWorkerFactory(__DIR__ . '/Fixtures/custom-bootstrap.php');
Loop::run(function () use ($factory) {
$worker = $factory->create();
$worker = $factory->create();
$this->assertTrue(yield $worker->enqueue(new Fixtures\AutoloadTestTask));
$this->assertTrue(yield $worker->enqueue(new Fixtures\AutoloadTestTask));
yield $worker->shutdown();
});
yield $worker->shutdown();
}
public function testInvalidAutoloaderPath()

View File

@ -4,25 +4,23 @@ namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\DefaultWorkerFactory;
use Amp\Parallel\Worker\Worker;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class DefaultWorkerFactoryTest extends TestCase
class DefaultWorkerFactoryTest extends AsyncTestCase
{
/**
* @expectedException \Error
* @expectedExceptionMessage Invalid environment class name 'Invalid'
*/
public function testInvalidClassName()
{
$this->expectException(\Error::class);
$this->expectExceptionMessage("Invalid environment class name 'Invalid'");
$factory = new DefaultWorkerFactory("Invalid");
}
/**
* @expectedException \Error
* @expectedExceptionMessage does not implement 'Amp\Parallel\Worker\Environment'
*/
public function testNonEnvironmentClassName()
{
$this->expectException(\Error::class);
$this->expectExceptionMessage("does not implement 'Amp\\Parallel\\Worker\\Environment'");
$factory = new DefaultWorkerFactory(DefaultWorkerFactory::class);
}

View File

@ -7,11 +7,11 @@ use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Pool;
use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\WorkerFactory;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Promise;
use Amp\Success;
class FunctionsTest extends TestCase
class FunctionsTest extends AsyncTestCase
{
public function testPool()
{

View File

@ -3,9 +3,9 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\Internal\Job;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class JobTest extends TestCase
class JobTest extends AsyncTestCase
{
public function testGetJob()
{
@ -14,12 +14,11 @@ class JobTest extends TestCase
$this->assertSame($task, $job->getTask());
}
/**
* @expectedException \Error
* @expectedExceptionMessage Classes implementing Amp\Parallel\Worker\Task must be autoloadable by the Composer autoloader
*/
public function testUnserialiableClass()
{
$this->expectException(\Error::class);
$this->expectExceptionMessage('Classes implementing Amp\\Parallel\\Worker\\Task must be autoloadable by the Composer autoloader');
$task = new Fixtures\TestTask(42);
$job = new Job($task);
$serialized = \serialize($job);

View File

@ -3,9 +3,9 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\TaskError;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class TaskErrorTest extends TestCase
class TaskErrorTest extends AsyncTestCase
{
public function testGetWorkerTrace()
{

View File

@ -3,9 +3,9 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\TaskException;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class TaskExceptionTest extends TestCase
class TaskExceptionTest extends AsyncTestCase
{
public function testGetName()
{

View File

@ -3,31 +3,29 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\Internal\TaskFailure;
use Amp\Parallel\Worker\Worker;
use Amp\PHPUnit\TestCase;
use Amp\Promise;
use Amp\Parallel\Worker\TaskError;
use Amp\Parallel\Worker\TaskException;
use Amp\PHPUnit\AsyncTestCase;
class TaskFailureTest extends TestCase
class TaskFailureTest extends AsyncTestCase
{
/**
* @expectedException \Amp\Parallel\Worker\TaskException
* @expectedExceptionMessage Uncaught Exception in worker
*/
public function testWithException()
{
$this->expectException(TaskException::class);
$this->expectExceptionMessage('Uncaught Exception in worker');
$exception = new \Exception("Message", 1);
$result = new TaskFailure('a', $exception);
Promise\wait($result->promise());
yield $result->promise();
}
/**
* @expectedException \Amp\Parallel\Worker\TaskError
* @expectedExceptionMessage Uncaught Error in worker
*/
public function testWithError()
{
$this->expectException(TaskError::class);
$this->expectExceptionMessage('Uncaught Error in worker');
$exception = new \Error("Message", 1);
$result = new TaskFailure('a', $exception);
Promise\wait($result->promise());
yield $result->promise();
}
}

View File

@ -3,10 +3,10 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\Internal\TaskSuccess;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Promise;
class TaskSuccessTest extends TestCase
class TaskSuccessTest extends AsyncTestCase
{
public function testGetId()
{

View File

@ -3,9 +3,9 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\WorkerException;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\AsyncTestCase;
class WorkerExceptionTest extends TestCase
class WorkerExceptionTest extends AsyncTestCase
{
public function testConstructorShouldBeInstance()
{