1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Throw if sending task fails

This commit is contained in:
Aaron Piotrowski 2017-12-13 16:29:44 -06:00
parent acdfa66b12
commit 9e7d1e0801
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
4 changed files with 42 additions and 25 deletions

View File

@ -174,13 +174,7 @@ class Process implements Context {
throw new \Error("Cannot send exit result objects");
}
return call(function () use ($data) {
try {
yield $this->channel->send($data);
} catch (ChannelException $e) {
throw new ContextException("The context went away, potentially due to a fatal error or calling exit", 0, $e);
}
});
return $this->channel->send($data);
}
/**
@ -197,9 +191,6 @@ class Process implements Context {
if (!$data instanceof ExitResult) {
throw new SynchronizationError("Did not receive an exit result from process");
}
} catch (ChannelException $e) {
$this->kill();
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
} catch (\Throwable $exception) {
$this->kill();
throw $exception;

View File

@ -254,8 +254,6 @@ class Thread implements Context {
try {
$data = yield $this->channel->receive();
} catch (ChannelException $e) {
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
} finally {
Loop::disable($this->watcher);
}
@ -288,12 +286,12 @@ class Thread implements Context {
Loop::enable($this->watcher);
try {
yield $this->channel->send($data);
} catch (ChannelException $e) {
throw new ContextException("The context went away, potentially due to a fatal error or calling exit", 0, $e);
$result = yield $this->channel->send($data);
} finally {
Loop::disable($this->watcher);
}
return $result;
});
}
}

View File

@ -92,18 +92,27 @@ abstract class AbstractWorker implements Worker {
$this->context->start();
}
$empty = empty($this->jobQueue);
return call(function () use ($task) {
$empty = empty($this->jobQueue);
$job = new Internal\Job($task);
$this->jobQueue[$job->getId()] = $deferred = new Deferred;
$job = new Internal\Job($task);
$this->jobQueue[$job->getId()] = $deferred = new Deferred;
$this->context->send($job);
try {
yield $this->context->send($job);
if ($empty) {
$this->context->receive()->onResolve($this->onResolve);
}
} catch (\Throwable $exception) {
unset($this->jobQueue[$job->getId()]);
if (!empty($this->jobQueue)) {
$this->context->receive()->onResolve($this->onResolve);
}
$deferred->fail($exception);
}
if ($empty) {
$this->context->receive()->onResolve($this->onResolve);
}
return $deferred->promise();
return $deferred->promise();
});
}
/**

View File

@ -3,6 +3,7 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Loop;
use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\TaskError;
@ -121,7 +122,7 @@ abstract class AbstractWorkerTest extends TestCase {
$this->assertFalse($worker->isRunning());
}
public function testUnserializableTask() {
public function testNonAutoloadableTask() {
Loop::run(function () {
$worker = $this->createWorker();
@ -136,4 +137,22 @@ abstract class AbstractWorkerTest extends TestCase {
yield $worker->shutdown();
});
}
public function testUnserializableTask() {
Loop::run(function () {
$worker = $this->createWorker();
try {
yield $worker->enqueue(new class implements Task { // Anonymous classes are not serializable.
public function run(Environment $environment) {
}
});
$this->fail("Tasks that cannot be autoloaded should throw an exception");
} catch (SerializationException $exception) {
$this->assertSame(0, \strpos($exception->getMessage(), "The given data cannot be sent because it is not serializable"));
}
yield $worker->shutdown();
});
}
}