1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Combine pool and queue; rename pull() to get()

This commit is contained in:
Aaron Piotrowski 2016-01-14 18:54:53 -06:00
parent 59907f7884
commit 5d5119d600
8 changed files with 91 additions and 730 deletions

View File

@ -52,6 +52,11 @@ class DefaultPool implements Pool
*/
private $busyQueue;
/**
* @var \Closure
*/
private $push;
/**
* Creates a new worker pool.
*
@ -86,6 +91,10 @@ class DefaultPool implements Pool
$this->workers = new \SplObjectStorage();
$this->idleWorkers = new \SplQueue();
$this->busyQueue = new \SplQueue();
$this->push = function (Worker $worker) {
$this->push($worker);
};
}
/**
@ -178,47 +187,8 @@ class DefaultPool implements Pool
*/
public function enqueue(Task $task)
{
if (!$this->isRunning()) {
throw new StatusError('The worker pool has not been started.');
}
// Find a free worker if one is available.
if (!$this->idleWorkers->isEmpty()) {
$worker = $this->idleWorkers->dequeue();
} elseif ($this->getWorkerCount() < $this->maxSize) {
// We are allowed to spawn another worker if needed, so do so.
$worker = $this->createWorker();
} else {
// We have no other choice but to wait for a worker to be freed up.
$delayed = new Delayed();
$this->busyQueue->enqueue($delayed);
$worker = (yield $delayed);
}
try {
$result = (yield $worker->enqueue($task));
} finally {
if (!$worker->isRunning()) {
// Worker crashed, discard it and spin up a new worker.
$this->workers->detach($worker);
$worker = $this->createWorker();
}
// If someone is waiting on a worker, give it to them instead.
if (!$this->busyQueue->isEmpty()) {
/** @var \Icicle\Awaitable\Delayed $delayed */
$delayed = $this->busyQueue->dequeue();
if ($delayed->isPending()) {
$delayed->resolve($worker);
}
} else {
// No tasks are waiting, so add the worker to the idle queue.
$this->idleWorkers->enqueue($worker);
}
}
yield $result;
$worker = $this->get();
yield $worker->enqueue($task);
}
/**
@ -289,7 +259,65 @@ class DefaultPool implements Pool
$worker = $this->factory->create();
$worker->start();
$this->workers->attach($worker);
$this->workers->attach($worker, 0);
return $worker;
}
/**
* {@inheritdoc}
*/
public function get()
{
if (!$this->isRunning()) {
throw new StatusError('The queue is not running.');
}
do {
if ($this->idleWorkers->isEmpty()) {
if ($this->getWorkerCount() >= $this->maxSize) {
// All possible workers busy, so shift from head (will be pushed back onto tail below).
$worker = $this->busyQueue->shift();
} else {
// Max worker count has not been reached, so create another worker.
$worker = $this->createWorker();
}
} else {
// Shift a worker off the idle queue.
$worker = $this->idleWorkers->shift();
}
} while (!$worker->isRunning());
$this->busyQueue->push($worker);
$this->workers[$worker] += 1;
return new Internal\PooledWorker($worker, $this->push);
}
/**
* Pushes the worker back into the queue.
*
* @param \Icicle\Concurrent\Worker\Worker $worker
*
* @throws \Icicle\Exception\InvalidArgumentError If the worker was not part of this queue.
*/
private function push(Worker $worker)
{
if (!$this->workers->contains($worker)) {
throw new InvalidArgumentError(
'The provided worker was not part of this queue.'
);
}
if (0 === ($this->workers[$worker] -= 1)) {
// Worker is completely idle, remove from busy queue and add to idle queue.
foreach ($this->busyQueue as $key => $busy) {
if ($busy === $worker) {
unset($this->busyQueue[$key]);
break;
}
}
$this->idleWorkers->push($worker);
}
}
}

View File

@ -1,250 +0,0 @@
<?php
namespace Icicle\Concurrent\Worker;
use Icicle\Awaitable;
use Icicle\Concurrent\Exception\StatusError;
use Icicle\Coroutine\Coroutine;
use Icicle\Exception\InvalidArgumentError;
class DefaultQueue implements Queue
{
/**
* @var \Icicle\Concurrent\Worker\WorkerFactory
*/
private $factory;
/**
* @var int The minimum number of workers the queue should spawn.
*/
private $minSize;
/**
* @var int The maximum number of workers the queue should spawn.
*/
private $maxSize;
/**
* @var \SplQueue
*/
private $idle;
/**
* @var \SplQueue
*/
private $busy;
/**
* @var \SplObjectStorage
*/
private $workers;
/**
* @var bool
*/
private $running = false;
/**
* @var \Closure
*/
private $push;
/**
* @param int|null $minSize The minimum number of workers the queue should spawn.
* Defaults to `Queue::DEFAULT_MIN_SIZE`.
* @param int|null $maxSize The maximum number of workers the queue should spawn.
* Defaults to `Queue::DEFAULT_MAX_SIZE`.
* @param \Icicle\Concurrent\Worker\WorkerFactory|null $factory Factory used to create new workers.
*
* @throws \Icicle\Exception\InvalidArgumentError If the min or max size are invalid.
*/
public function __construct($minSize = null, $maxSize = null, WorkerFactory $factory = null)
{
$minSize = $minSize ?: self::DEFAULT_MIN_SIZE;
$maxSize = $maxSize ?: self::DEFAULT_MAX_SIZE;
if (!is_int($minSize) || $minSize < 0) {
throw new InvalidArgumentError('Minimum size must be a non-negative integer.');
}
if (!is_int($maxSize) || $maxSize < 0 || $maxSize < $minSize) {
throw new InvalidArgumentError('Maximum size must be a non-negative integer at least '.$minSize.'.');
}
$this->factory = $factory ?: factory();
$this->minSize = $minSize;
$this->maxSize = $maxSize;
$this->workers = new \SplObjectStorage();
$this->idle = new \SplQueue();
$this->busy = new \SplQueue();
$this->push = function (Worker $worker) {
$this->push($worker);
};
}
/**
* {@inheritdoc}
*/
public function isRunning()
{
return $this->running;
}
/**
* {@inheritdoc}
*/
public function start()
{
if ($this->isRunning()) {
throw new StatusError('The worker queue has already been started.');
}
// Start up the pool with the minimum number of workers.
$count = $this->minSize;
while (--$count >= 0) {
$worker = $this->factory->create();
$worker->start();
$this->workers->attach($worker, 0);
$this->idle->push($worker);
}
$this->running = true;
}
/**
* {@inheritdoc}
*/
public function pull()
{
if (!$this->isRunning()) {
throw new StatusError('The queue is not running.');
}
do {
if ($this->idle->isEmpty()) {
if ($this->busy->count() >= $this->maxSize) {
// All possible workers busy, so shift from head (will be pushed back onto tail below).
$worker = $this->busy->shift();
} else {
// Max worker count has not been reached, so create another worker.
$worker = $this->factory->create();
$worker->start();
$this->workers->attach($worker, 0);
}
} else {
// Shift a worker off the idle queue.
$worker = $this->idle->shift();
}
} while (!$worker->isRunning());
$this->busy->push($worker);
$this->workers[$worker] += 1;
return new Internal\QueuedWorker($worker, $this->push);
}
/**
* Pushes the worker back into the queue.
*
* @param \Icicle\Concurrent\Worker\Worker $worker
*
* @throws \Icicle\Exception\InvalidArgumentError If the worker was not part of this queue.
*/
private function push(Worker $worker)
{
if (!$this->workers->contains($worker)) {
throw new InvalidArgumentError(
'The provided worker was not part of this queue.'
);
}
if (0 === ($this->workers[$worker] -= 1)) {
// Worker is completely idle, remove from busy queue and add to idle queue.
foreach ($this->busy as $key => $busy) {
if ($busy === $worker) {
unset($this->busy[$key]);
break;
}
}
$this->idle->push($worker);
}
}
/**
* {@inheritdoc}
*/
public function enqueue(Task $task)
{
$worker = $this->pull();
yield $worker->enqueue($task);
}
/**
* {@inheritdoc}
*/
public function getMinSize()
{
return $this->minSize;
}
/**
* {@inheritdoc}
*/
public function getMaxSize()
{
return $this->maxSize;
}
/**
* {@inheritdoc}
*/
public function getWorkerCount()
{
return $this->idle->count() + $this->busy->count();
}
/**
* {@inheritdoc}
*/
public function getIdleWorkerCount()
{
return $this->idle->count();
}
/**
* {@inheritdoc}
*/
public function shutdown()
{
if (!$this->isRunning()) {
throw new StatusError('The queue is not running.');
}
$this->running = false;
$shutdowns = [];
foreach ($this->workers as $worker) {
if ($worker->isRunning()) {
$shutdowns[] = new Coroutine($worker->shutdown());
}
}
yield Awaitable\reduce($shutdowns, function ($carry, $value) {
return $carry ?: $value;
}, 0);
}
/**
* {@inheritdoc}
*/
public function kill()
{
$this->running = false;
foreach ($this->workers as $worker) {
$worker->kill();
}
}
}

View File

@ -4,7 +4,7 @@ namespace Icicle\Concurrent\Worker\Internal;
use Icicle\Concurrent\Worker\Task;
use Icicle\Concurrent\Worker\Worker;
class QueuedWorker implements Worker
class PooledWorker implements Worker
{
/**
* @var callable

View File

@ -16,6 +16,16 @@ interface Pool extends Worker
*/
const DEFAULT_MAX_SIZE = 32;
/**
* Gets a worker from the pool. The worker is marked as busy and will only be reused if the pool runs out of
* idle workers. The worker will be automatically marked as idle once no references to the returned worker remain.
*
* @return \Icicle\Concurrent\Worker\Worker
*
* @throws \Icicle\Concurrent\Exception\StatusError If the queue is not running.
*/
public function get();
/**
* Gets the number of workers currently running in the pool.
*

View File

@ -1,95 +0,0 @@
<?php
namespace Icicle\Concurrent\Worker;
interface Queue
{
/**
* @var int The default minimum queue size.
*/
const DEFAULT_MIN_SIZE = 4;
/**
* @var int The default maximum queue size.
*/
const DEFAULT_MAX_SIZE = 32;
/**
* Pull a worker from the queue. The worker is marked as busy and will only be reused if the queue runs out of
* idle workers. The worker will be automatically marked as idle once no references to the returned worker remain.
*
* @return \Icicle\Concurrent\Worker\Worker
*
* @throws \Icicle\Concurrent\Exception\StatusError If the queue is not running.
*/
public function pull();
/**
* Pushes a worker into the queue, marking it as idle and available to be pulled from the queue again.
*
* @param \Icicle\Concurrent\Worker\Worker $worker
*
* @throws \Icicle\Concurrent\Exception\StatusError If the queue is not running.
* @throws \Icicle\Exception\InvalidArgumentError If the given worker is not part of this queue or was already
* pushed into the queue.
*/
//public function push(Worker $worker);
/**
* Checks if the queue is running.
*
* @return bool True if the queue is running, otherwise false.
*/
public function isRunning();
/**
* Starts the worker queue execution.
*
* When the worker queue starts up, the minimum number of workers will be created. This adds some overhead to
* starting the queue, but allows for greater performance during runtime.
*/
public function start();
/**
* Gracefully shuts down all workers in the queue.
*
* @coroutine
*
* @return \Generator
*
* @resolve int Exit code.
*/
public function shutdown();
/**
* Immediately kills all workers in the queue.
*/
public function kill();
/**
* Gets the minimum number of workers the queue may have idle.
*
* @return int The minimum number of workers.
*/
public function getMinSize();
/**
* Gets the maximum number of workers the queue may spawn to handle concurrent tasks.
*
* @return int The maximum number of workers.
*/
public function getMaxSize();
/**
* Gets the number of workers currently running in the pool.
*
* @return int The number of workers.
*/
public function getWorkerCount();
/**
* Gets the number of workers that are currently idle.
*
* @return int The number of idle workers.
*/
public function getIdleWorkerCount();
}

View File

@ -75,36 +75,12 @@ if (!function_exists(__NAMESPACE__ . '\pool')) {
}
/**
* Gets or sets the global worker queue instance.
*
* @param \Icicle\Concurrent\Worker\Queue|null $queue
*
* @return \Icicle\Concurrent\Worker\Queue
*/
function queue(Queue $queue = null)
{
static $instance;
if (null !== $queue) {
$instance = $queue;
} elseif (null === $instance) {
$instance = new DefaultQueue();
}
if (!$instance->isRunning()) {
$instance->start();
}
return $instance;
}
/**
* Pulls a worker from the global worker queue.
* Gets a worker from the global worker pool.
*
* @return \Icicle\Concurrent\Worker\Worker
*/
function pull()
function get()
{
return queue()->pull();
return pool()->get();
}
}

View File

@ -1,288 +0,0 @@
<?php
namespace Icicle\Tests\Concurrent\Worker;
use Icicle\Awaitable;
use Icicle\Concurrent\Worker\DefaultQueue;
use Icicle\Concurrent\Worker\Task;
use Icicle\Concurrent\Worker\Worker;
use Icicle\Concurrent\Worker\WorkerFactory;
use Icicle\Coroutine;
use Icicle\Loop;
use Icicle\Tests\Concurrent\TestCase;
class DefaultQueueTest extends TestCase
{
/**
* @param int $min
* @param int $max
*
* @return \Icicle\Concurrent\Worker\Queue
*/
protected function createQueue($min = null, $max = null)
{
$factory = $this->getMock(WorkerFactory::class);
$factory->method('create')->will($this->returnCallback(function () {
$running = true;
$mock = $this->getMock(Worker::class);
$mock->method('shutdown')
->will($this->returnCallback(function () use (&$running) {
$running = false;
yield 0;
}));
$mock->method('kill')
->will($this->returnCallback(function () use (&$running) {
$running = false;
}));
$mock->method('isRunning')
->will($this->returnCallback(function () use (&$running) {
return $running;
}));
$mock->method('enqueue')
->will($this->returnCallback(function (Task $task) use ($mock) {
yield $mock;
}));
return $mock;
}));
return new DefaultQueue($min, $max, $factory);
}
public function testIsRunning()
{
Coroutine\run(function () {
$queue = $this->createQueue();
$this->assertFalse($queue->isRunning());
$queue->start();
$this->assertTrue($queue->isRunning());
yield $queue->shutdown();
$this->assertFalse($queue->isRunning());
});
}
public function testGetMinSize()
{
$queue = $this->createQueue(7, 24);
$this->assertEquals(7, $queue->getMinSize());
}
public function testGetMaxSize()
{
$queue = $this->createQueue(3, 17);
$this->assertEquals(17, $queue->getMaxSize());
}
public function testMinWorkersSpawnedOnStart()
{
Coroutine\run(function () {
$queue = $this->createQueue(8, 32);
$queue->start();
$this->assertEquals(8, $queue->getWorkerCount());
yield $queue->shutdown();
});
}
public function testWorkersIdleOnStart()
{
Coroutine\run(function () {
$queue = $this->createQueue(8, 32);
$queue->start();
$this->assertEquals(8, $queue->getIdleWorkerCount());
yield $queue->shutdown();
});
}
public function testPullPushIsCyclical()
{
Coroutine\run(function () {
$queue = $this->createQueue(2, 2);
$queue->start();
$worker1 = $queue->pull();
$worker2 = $queue->pull();
$this->assertNotSame($worker1, $worker2);
$mock1 = (yield $worker1->enqueue($this->getMock(Task::class)));
$mock2 = (yield $worker2->enqueue($this->getMock(Task::class)));
unset($worker1, $worker2);
$this->assertSame(2, $queue->getWorkerCount());
$this->assertSame(2, $queue->getIdleWorkerCount());
$worker3 = $queue->pull();
$this->assertSame(1, $queue->getIdleWorkerCount());
$mock3 = (yield $worker3->enqueue($this->getMock(Task::class)));
$this->assertSame($mock1, $mock3);
$worker4 = $queue->pull();
$this->assertSame(0, $queue->getIdleWorkerCount());
$this->assertSame(2, $queue->getWorkerCount());
$mock4 = (yield $worker4->enqueue($this->getMock(Task::class)));
$this->assertSame($mock2, $mock4);
yield $queue->shutdown();
});
}
/**
* @depends testPullPushIsCyclical
*/
public function testPullReturnsLastPushedWhenBusy()
{
Coroutine\run(function () {
$queue = $this->createQueue(2, 2);
$queue->start();
$worker1 = $queue->pull();
$worker2 = $queue->pull();
$this->assertSame(2, $queue->getWorkerCount());
$this->assertSame(0, $queue->getIdleWorkerCount());
$mock2 = (yield $worker2->enqueue($this->getMock(Task::class)));
unset($worker2);
$this->assertSame(1, $queue->getIdleWorkerCount());
$worker3 = $queue->pull();
$this->assertSame(0, $queue->getIdleWorkerCount());
$this->assertSame(2, $queue->getWorkerCount());
$mock3 = (yield $worker3->enqueue($this->getMock(Task::class)));
$this->assertSame($mock2, $mock3);
yield $queue->shutdown();
});
}
/**
* @depends testPullPushIsCyclical
*/
public function testPullReturnsFirstBusyWhenAllBusyAndAtMax()
{
Coroutine\run(function () {
$queue = $this->createQueue(2, 2);
$queue->start();
$worker1 = $queue->pull();
$worker2 = $queue->pull();
$worker3 = $queue->pull();
$mock1 = (yield $worker1->enqueue($this->getMock(Task::class)));
$mock3 = (yield $worker3->enqueue($this->getMock(Task::class)));
$this->assertSame($mock1, $mock3);
$worker4 = $queue->pull();
$mock2 = (yield $worker2->enqueue($this->getMock(Task::class)));
$mock4 = (yield $worker4->enqueue($this->getMock(Task::class)));
$this->assertSame($mock2, $mock4);
$worker5 = $queue->pull();
$mock5 = (yield $worker5->enqueue($this->getMock(Task::class)));
$this->assertSame($mock1, $mock5);
yield $queue->shutdown();
});
}
/**
* @depends testPullReturnsFirstBusyWhenAllBusyAndAtMax
*/
public function testPullSpawnsNewWorkerWhenAllOthersBusyAndBelowMax()
{
Coroutine\run(function () {
$queue = $this->createQueue(2, 4);
$queue->start();
$worker1 = $queue->pull();
$worker2 = $queue->pull();
$this->assertSame(2, $queue->getWorkerCount());
$worker3 = $queue->pull();
$this->assertSame(3, $queue->getWorkerCount());
$mock1 = (yield $worker1->enqueue($this->getMock(Task::class)));
$mock2 = (yield $worker2->enqueue($this->getMock(Task::class)));
$mock3 = (yield $worker3->enqueue($this->getMock(Task::class)));
$this->assertNotSame($mock1, $mock3);
$this->assertNotSame($mock2, $mock3);
$worker4 = $queue->pull();
$this->assertSame(4, $queue->getWorkerCount());
$mock4 = (yield $worker4->enqueue($this->getMock(Task::class)));
$this->assertNotSame($mock1, $mock4);
$this->assertNotSame($mock2, $mock4);
$this->assertNotSame($mock3, $mock4);
$worker5 = $queue->pull();
$this->assertSame(4, $queue->getWorkerCount());
$mock5 = (yield $worker5->enqueue($this->getMock(Task::class)));
$this->assertSame($mock1, $mock5);
yield $queue->shutdown();
});
}
/**
* @depends testPullPushIsCyclical
*/
public function testPushOnlyMarksIdleAfterPushesEqualPulls()
{
Coroutine\run(function () {
$queue = $this->createQueue(2, 2);
$queue->start();
$worker1 = $queue->pull();
$worker2 = $queue->pull();
$worker3 = $queue->pull();
$mock1 = (yield $worker1->enqueue($this->getMock(Task::class)));
$mock2 = (yield $worker2->enqueue($this->getMock(Task::class)));
$mock3 = (yield $worker3->enqueue($this->getMock(Task::class)));
$this->assertSame($mock1, $mock3);
// Should only mark $worker2 as idle, not $worker3 even though it's pushed first.
unset($worker3, $worker2);
// Should pull $worker2 again.
$worker4 = $queue->pull();
$mock4 = (yield $worker4->enqueue($this->getMock(Task::class)));
$this->assertSame($mock2, $mock4);
// Unsetting $worker1 first, which should now be marked as idle (and so should $worker2/4)
unset($worker1, $worker4);
// Should pull $worker1 now since it was marked idle.
$worker5 = $queue->pull();
$mock5 = (yield $worker5->enqueue($this->getMock(Task::class)));
$this->assertSame($mock1, $mock5);
yield $queue->shutdown();
});
}
public function testKill()
{
$queue = $this->createQueue();
$queue->start();
$worker = $queue->pull();
$this->assertRunTimeLessThan([$queue, 'kill'], 0.5);
$this->assertFalse($queue->isRunning());
$this->assertFalse($worker->isRunning());
}
}

View File

@ -3,7 +3,6 @@ namespace Icicle\Tests\Concurrent\Worker;
use Icicle\Concurrent\Worker;
use Icicle\Concurrent\Worker\Environment;
use Icicle\Concurrent\Worker\Queue;
use Icicle\Concurrent\Worker\Pool;
use Icicle\Concurrent\Worker\Task;
use Icicle\Concurrent\Worker\WorkerFactory;
@ -53,38 +52,19 @@ class FunctionsTest extends TestCase
$this->assertSame($value, $coroutine->wait());
}
public function testGetQueue()
{
$queue = Worker\queue();
$this->assertInstanceOf(Queue::class, $queue);
}
/**
* @depends testGetQueue
* @depends testSetPool
*/
public function testSetQueue()
public function testGet()
{
$queue = $this->getMock(Queue::class);
Worker\queue($queue);
$this->assertSame($queue, Worker\queue());
}
/**
* @depends testSetQueue
*/
public function testPull()
{
$queue = $this->getMock(Queue::class);
$queue->expects($this->once())
->method('pull')
$pool = $this->getMock(Pool::class);
$pool->expects($this->once())
->method('get')
->will($this->returnValue($this->getMock(Worker\Worker::class)));
Worker\queue($queue);
Worker\pool($pool);
$worker = Worker\pull();
$worker = Worker\get();
}
public function testGetFactory()