1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Handle rapidly enqueued tasks on workers

This commit is contained in:
Stephen Coakley 2015-12-11 18:15:15 -06:00
parent d8b21c3283
commit 2d001c5bfe
2 changed files with 54 additions and 1 deletions

View File

@ -1,10 +1,14 @@
<?php
namespace Icicle\Concurrent\Worker;
use Icicle\Awaitable\Delayed;
use Icicle\Concurrent\Context;
use Icicle\Concurrent\Exception\StatusError;
use Icicle\Concurrent\Worker\Internal\TaskFailure;
/**
* Base class for most common types of task workers.
*/
abstract class AbstractWorker implements Worker
{
/**
@ -22,12 +26,18 @@ abstract class AbstractWorker implements Worker
*/
private $shutdown = false;
/**
* @var \SplQueue
*/
private $busyQueue;
/**
* @param \Icicle\Concurrent\Context $context
*/
public function __construct(Context $context)
{
$this->context = $context;
$this->busyQueue = new \SplQueue();
}
/**
@ -64,7 +74,14 @@ abstract class AbstractWorker implements Worker
}
if ($this->shutdown) {
throw new StatusError('The worker has been shutdown.');
throw new StatusError('The worker has been shut down.');
}
// If the worker is currently busy, store the task in a busy queue.
if (!$this->idle) {
$delayed = new Delayed();
$this->busyQueue->enqueue($delayed);
yield $delayed;
}
$this->idle = false;
@ -75,6 +92,11 @@ abstract class AbstractWorker implements Worker
$this->idle = true;
// We're no longer busy at the moment, so dequeue a waiting task.
if (!$this->busyQueue->isEmpty()) {
$this->busyQueue->dequeue()->resolve();
}
if ($result instanceof TaskFailure) {
throw $result->getException();
}
@ -96,6 +118,11 @@ abstract class AbstractWorker implements Worker
yield $this->context->send(0);
yield $this->context->join();
// Cancel any waiting tasks.
while (!$this->busyQueue->isEmpty()) {
$this->busyQueue->dequeue()->reject();
}
}
/**
@ -104,5 +131,10 @@ abstract class AbstractWorker implements Worker
public function kill()
{
$this->context->kill();
// Cancel any waiting tasks.
while (!$this->busyQueue->isEmpty()) {
$this->busyQueue->dequeue()->reject();
}
}
}

View File

@ -1,6 +1,7 @@
<?php
namespace Icicle\Tests\Concurrent\Worker;
use Icicle\Awaitable;
use Icicle\Coroutine;
use Icicle\Loop;
use Icicle\Tests\Concurrent\TestCase;
@ -57,6 +58,26 @@ abstract class AbstractWorkerTest extends TestCase
Loop\run();
}
public function testEnqueueMultiple()
{
Coroutine\create(function () {
$worker = $this->createWorker();
$worker->start();
$values = (yield Awaitable\all([
new Coroutine\Coroutine($worker->enqueue(new TestTask(42))),
new Coroutine\Coroutine($worker->enqueue(new TestTask(56))),
new Coroutine\Coroutine($worker->enqueue(new TestTask(72)))
]));
$this->assertEquals([42, 56, 72], $values);
yield $worker->shutdown();
})->done();
Loop\run();
}
public function testNotIdleOnEnqueue()
{
Coroutine\create(function () {