diff --git a/lib/Worker/Internal/TaskSuccess.php b/lib/Worker/Internal/TaskSuccess.php index 531fbfc..962cdba 100644 --- a/lib/Worker/Internal/TaskSuccess.php +++ b/lib/Worker/Internal/TaskSuccess.php @@ -2,6 +2,8 @@ namespace Amp\Parallel\Worker\Internal; +use Amp\Failure; +use Amp\Parallel\Worker\Task; use Amp\Promise; use Amp\Success; @@ -19,6 +21,13 @@ final class TaskSuccess extends TaskResult public function promise(): Promise { + if ($this->result instanceof \__PHP_Incomplete_Class) { + return new Failure(new \Error(\sprintf( + "Class instances returned from %s::run() must be autoloadable by the Composer autoloader", + Task::class + ))); + } + return new Success($this->result); } } diff --git a/lib/Worker/TaskRunner.php b/lib/Worker/TaskRunner.php index d86d982..98ed26d 100644 --- a/lib/Worker/TaskRunner.php +++ b/lib/Worker/TaskRunner.php @@ -4,6 +4,7 @@ namespace Amp\Parallel\Worker; use Amp\Coroutine; use Amp\Parallel\Sync\Channel; +use Amp\Parallel\Sync\SerializationException; use Amp\Promise; use function Amp\call; @@ -50,7 +51,12 @@ final class TaskRunner $job = null; // Free memory from last job. - yield $this->channel->send($result); + try { + yield $this->channel->send($result); + } catch (SerializationException $exception) { + // Could not serialize task result. + yield $this->channel->send(new Internal\TaskFailure($result->getId(), $exception)); + } $result = null; // Free memory from last result. diff --git a/test/Worker/AbstractWorkerTest.php b/test/Worker/AbstractWorkerTest.php index 0630c9c..ba5f7d3 100644 --- a/test/Worker/AbstractWorkerTest.php +++ b/test/Worker/AbstractWorkerTest.php @@ -7,6 +7,7 @@ use Amp\Parallel\Sync\SerializationException; use Amp\Parallel\Worker\Environment; use Amp\Parallel\Worker\Task; use Amp\Parallel\Worker\TaskError; +use Amp\Parallel\Worker\TaskException; use Amp\Parallel\Worker\WorkerException; use Amp\PHPUnit\TestCase; @@ -203,7 +204,7 @@ abstract class AbstractWorkerTest extends TestCase { } }); - $this->fail("Tasks that cannot be autoloaded should throw an exception"); + $this->fail("Tasks that cannot be serialized should throw an exception"); } catch (SerializationException $exception) { $this->assertSame(0, \strpos($exception->getMessage(), "The given data cannot be sent because it is not serializable")); } @@ -212,6 +213,38 @@ abstract class AbstractWorkerTest extends TestCase }); } + public function testUnserializableResult() + { + Loop::run(function () { + $worker = $this->createWorker(); + + try { + yield $worker->enqueue(new UnserializableResultTask); + $this->fail("Tasks results that cannot be serialized should throw an exception"); + } catch (TaskException $exception) { + $this->assertSame(0, \strpos($exception->getMessage(), "Uncaught Amp\Parallel\Sync\SerializationException in worker")); + } + + yield $worker->shutdown(); + }); + } + + public function testNonAutoloadableResult() + { + Loop::run(function () { + $worker = $this->createWorker(); + + try { + yield $worker->enqueue(new NonAutoloadableResultTask); + $this->fail("Tasks results that cannot be autoloaded should throw an exception"); + } catch (\Error $exception) { + $this->assertSame(0, \strpos($exception->getMessage(), "Class instances returned from Amp\Parallel\Worker\Task::run() must be autoloadable by the Composer autoloader")); + } + + yield $worker->shutdown(); + }); + } + public function testUnserializableTaskFollowedByValidTask() { Loop::run(function () { diff --git a/test/Worker/NonAutoloadableResultTask.php b/test/Worker/NonAutoloadableResultTask.php new file mode 100644 index 0000000..d32c4e7 --- /dev/null +++ b/test/Worker/NonAutoloadableResultTask.php @@ -0,0 +1,15 @@ +