1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Tweak worker pool; reject pending tasks on shutdown

This commit is contained in:
Aaron Piotrowski 2015-09-02 08:51:59 -05:00
parent c0ed3e3c05
commit 7a60d74937
6 changed files with 82 additions and 37 deletions

View File

@ -4,18 +4,21 @@ require dirname(__DIR__).'/vendor/autoload.php';
use Icicle\Concurrent\Worker;
use Icicle\Concurrent\Worker\HelloTask;
use Icicle\Coroutine;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
use Icicle\Promise;
Coroutine\create(function () {
$generator = function () {
$returnValues = (yield Promise\all([
Worker\enqueue(new HelloTask()),
Worker\enqueue(new HelloTask()),
Worker\enqueue(new HelloTask()),
new Coroutine(Worker\enqueue(new HelloTask())),
new Coroutine(Worker\enqueue(new HelloTask())),
new Coroutine(Worker\enqueue(new HelloTask())),
]));
var_dump($returnValues);
})->done();
};
$coroutine = new Coroutine($generator());
$coroutine->done();
Loop\run();

View File

@ -0,0 +1,6 @@
<?php
namespace Icicle\Concurrent\Exception;
class WorkerException extends Exception
{
}

View File

@ -4,7 +4,7 @@ namespace Icicle\Concurrent\Threading;
use Icicle\Concurrent\ChannelInterface;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Sync\Internal\ExitStatusInterface;
use Icicle\Concurrent\Sync\ChannelInterface as SyncChannelInterface;;
use Icicle\Concurrent\Sync;
use Icicle\Concurrent\SynchronizableInterface;
use Icicle\Coroutine;
@ -26,7 +26,7 @@ class Executor implements ChannelInterface, SynchronizableInterface
* @param \Icicle\Concurrent\Threading\Internal\Thread $thread
* @param \Icicle\Concurrent\Sync\ChannelInterface $channel
*/
public function __construct(Internal\Thread $thread, SyncChannelInterface $channel)
public function __construct(Internal\Thread $thread, Sync\ChannelInterface $channel)
{
$this->thread = $thread;
$this->channel = $channel;

View File

@ -3,6 +3,7 @@ namespace Icicle\Concurrent\Worker;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Exception\SynchronizationError;
use Icicle\Concurrent\Exception\WorkerException;
use Icicle\Coroutine\Coroutine;
use Icicle\Promise;
use Icicle\Promise\Deferred;
@ -65,13 +66,11 @@ class Pool implements PoolInterface
/**
* Creates a new worker pool.
*
* @param int|null $minSize The minimum number of workers the pool should spawn. Defaults to
* `Pool::DEFAULT_MIN_SIZE`.
* @param int|null $maxSize The maximum number of workers the pool should spawn. Defaults to
* `Pool::DEFAULT_MAX_SIZE`.
* @param int $minSize The minimum number of workers the pool should spawn. Defaults to `Pool::DEFAULT_MIN_SIZE`.
* @param int $maxSize The maximum number of workers the pool should spawn. Defaults to `Pool::DEFAULT_MAX_SIZE`.
* @param WorkerFactoryInterface|null $factory A worker factory to be used to create new workers.
*/
public function __construct($minSize = null, $maxSize = null, WorkerFactoryInterface $factory = null)
public function __construct($minSize = 0, $maxSize = 0, WorkerFactoryInterface $factory = null)
{
$minSize = $minSize ?: static::DEFAULT_MIN_SIZE;
$maxSize = $minSize ?: static::DEFAULT_MAX_SIZE;
@ -179,25 +178,22 @@ class Pool implements PoolInterface
*
* @resolve mixed The return value of the task.
*/
public function enqueue(TaskInterface $task /* , ...$args */)
public function enqueue(TaskInterface $task)
{
if (!$this->running) {
if (!$this->isRunning()) {
throw new SynchronizationError('The worker pool has not been started.');
}
$args = array_slice(func_get_args(), 1);
// Enqueue the task if we have an idle worker.
if ($worker = $this->getIdleWorker()) {
yield $this->enqueueToWorker($worker, $task, $args);
yield $this->enqueueToWorker($worker, $task);
return;
}
// If we're at our limit of busy workers, add the task to the waiting list to be enqueued later when a new
// worker becomes available.
$deferred = new Deferred();
$this->busyQueue->enqueue($task);
$this->deferredQueue->enqueue($deferred);
$this->busyQueue->enqueue([$task, $deferred]);
// Yield a promise that will be resolved when the task gets processed later.
yield $deferred->getPromise();
@ -212,14 +208,21 @@ class Pool implements PoolInterface
*/
public function shutdown()
{
if (!$this->isRunning()) {
throw new SynchronizationError('The pool is not running.');
}
$this->close();
$shutdowns = [];
foreach ($this->workers as $worker) {
$shutdowns[] = new Coroutine($worker->shutdown());
}
yield Promise\all($shutdowns);
$this->running = false;
yield Promise\reduce($shutdowns, function ($carry, $value) {
return $carry ?: $value;
}, 0);
}
/**
@ -227,21 +230,29 @@ class Pool implements PoolInterface
*/
public function kill()
{
foreach ($this->workers as $worker) {
$worker->kill();
}
if ($this->isRunning()) {
$this->close();
$this->running = false;
foreach ($this->workers as $worker) {
$worker->kill();
}
}
}
/**
* Shuts down the pool when it is destroyed.
* Rejects any queued tasks.
*/
public function __destruct()
private function close()
{
if ($this->isRunning()) {
$coroutine = new Coroutine($this->shutdown());
$coroutine->done();
$this->running = false;
if (!$this->busyQueue->isEmpty()) {
$exception = new WorkerException('Worker pool was shutdown.');
do {
/** @var \Icicle\Promise\Deferred $deferred */
list(, $deferred) = $this->busyQueue->dequeue();
$deferred->reject($exception);
} while (!$this->busyQueue->isEmpty());
}
}
@ -277,6 +288,8 @@ class Pool implements PoolInterface
if ($this->getWorkerCount() < $this->maxSize) {
return $this->createWorker();
}
return null; // No idle workers available and cannot spawn more.
}
/**
@ -288,17 +301,16 @@ class Pool implements PoolInterface
* @coroutine
*
* @param WorkerInterface $worker The worker to enqueue to.
* @param TaskInterface $task The task to enqueue.
* @param array $args An array of arguments to pass to the task.
* @param TaskInterface $task The task to enqueue.
*
* @return \Generator
*
* @resolve mixed The return value of the task.
*/
private function enqueueToWorker(WorkerInterface $worker, TaskInterface $task, array $args = [])
private function enqueueToWorker(WorkerInterface $worker, TaskInterface $task)
{
$this->idleWorkers->detach($worker);
yield call_user_func_array([$worker, 'enqueue'], array_merge([$task], $args));
yield $worker->enqueue($task);
$this->idleWorkers->attach($worker);
// Spawn a new coroutine to process the busy queue if not empty.
@ -324,8 +336,8 @@ class Pool implements PoolInterface
break;
}
$task = $this->busyQueue->dequeue();
$deferred = $this->deferredQueue->dequeue();
/** @var \Icicle\Promise\Deferred $deferred */
list($task, $deferred) = $this->busyQueue->dequeue();
try {
$returnValue = (yield $this->enqueueToWorker($worker, $task));

View File

@ -79,6 +79,10 @@ class Worker implements WorkerInterface
*/
public function shutdown()
{
if (!$this->context->isRunning()) {
throw new SynchronizationError('The worker is not running.');
}
yield $this->context->send(0);
yield $this->context->join();

View File

@ -41,4 +41,24 @@ if (!function_exists(__NAMESPACE__ . '\pool')) {
{
return pool()->enqueue($task);
}
/**
* @param \Icicle\Concurrent\Worker\WorkerFactoryInterface|null $factory
*
* @return \Icicle\Concurrent\Worker\WorkerInterface
*/
function create(WorkerFactoryInterface $factory = null)
{
static $instance;
if (null !== $factory) {
$instance = $factory;
} elseif (null === $instance) {
$instance = new WorkerFactory();
}
$worker = $instance->create();
$worker->start();
return $worker;
}
}