From 2d001c5bfeb7f7ceef1897c4bf6d720e98bba3bf Mon Sep 17 00:00:00 2001 From: Stephen Coakley Date: Fri, 11 Dec 2015 18:15:15 -0600 Subject: [PATCH] Handle rapidly enqueued tasks on workers --- src/Worker/AbstractWorker.php | 34 ++++++++++++++++++++++++++++- tests/Worker/AbstractWorkerTest.php | 21 ++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/Worker/AbstractWorker.php b/src/Worker/AbstractWorker.php index 7bdbea8..306a239 100644 --- a/src/Worker/AbstractWorker.php +++ b/src/Worker/AbstractWorker.php @@ -1,10 +1,14 @@ context = $context; + $this->busyQueue = new \SplQueue(); } /** @@ -64,7 +74,14 @@ abstract class AbstractWorker implements Worker } if ($this->shutdown) { - throw new StatusError('The worker has been shutdown.'); + throw new StatusError('The worker has been shut down.'); + } + + // If the worker is currently busy, store the task in a busy queue. + if (!$this->idle) { + $delayed = new Delayed(); + $this->busyQueue->enqueue($delayed); + yield $delayed; } $this->idle = false; @@ -75,6 +92,11 @@ abstract class AbstractWorker implements Worker $this->idle = true; + // We're no longer busy at the moment, so dequeue a waiting task. + if (!$this->busyQueue->isEmpty()) { + $this->busyQueue->dequeue()->resolve(); + } + if ($result instanceof TaskFailure) { throw $result->getException(); } @@ -96,6 +118,11 @@ abstract class AbstractWorker implements Worker yield $this->context->send(0); yield $this->context->join(); + + // Cancel any waiting tasks. + while (!$this->busyQueue->isEmpty()) { + $this->busyQueue->dequeue()->reject(); + } } /** @@ -104,5 +131,10 @@ abstract class AbstractWorker implements Worker public function kill() { $this->context->kill(); + + // Cancel any waiting tasks. + while (!$this->busyQueue->isEmpty()) { + $this->busyQueue->dequeue()->reject(); + } } } diff --git a/tests/Worker/AbstractWorkerTest.php b/tests/Worker/AbstractWorkerTest.php index 7b6eefe..83ca506 100644 --- a/tests/Worker/AbstractWorkerTest.php +++ b/tests/Worker/AbstractWorkerTest.php @@ -1,6 +1,7 @@ createWorker(); + $worker->start(); + + $values = (yield Awaitable\all([ + new Coroutine\Coroutine($worker->enqueue(new TestTask(42))), + new Coroutine\Coroutine($worker->enqueue(new TestTask(56))), + new Coroutine\Coroutine($worker->enqueue(new TestTask(72))) + ])); + + $this->assertEquals([42, 56, 72], $values); + + yield $worker->shutdown(); + })->done(); + + Loop\run(); + } + public function testNotIdleOnEnqueue() { Coroutine\create(function () {