1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Maintain task order for fast-finishing/blocking tasks

This commit is contained in:
Aaron Piotrowski 2017-06-19 22:14:19 -05:00
parent 998a255c83
commit 29a1d1bb8c
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB

View File

@ -51,12 +51,13 @@ abstract class AbstractWorker implements Worker {
$deferred = $this->jobQueue[$id];
unset($this->jobQueue[$id]);
if (!empty($this->jobQueue)) {
$this->context->receive()->onResolve($this->onResolve);
}
$empty = empty($this->jobQueue);
$deferred->resolve($data->promise());
if (!$empty) {
$this->context->receive()->onResolve($this->onResolve);
}
};
}
@ -108,13 +109,16 @@ abstract class AbstractWorker implements Worker {
* @throws \Amp\Parallel\Worker\WorkerException
*/
private function doEnqueue(Task $task): \Generator {
if (empty($this->jobQueue)) {
$this->context->receive()->onResolve($this->onResolve);
}
$empty = empty($this->jobQueue);
try {
$job = new Internal\Job($task);
$this->jobQueue[$job->getId()] = $deferred = new Deferred;
if ($empty) {
$this->context->receive()->onResolve($this->onResolve);
}
yield $this->context->send($job);
} catch (\Throwable $exception) {
$exception = new WorkerException("Sending the task to the worker failed", $exception);