2016-12-30 02:16:04 +01:00
|
|
|
<?php
|
2015-12-05 06:50:32 +01:00
|
|
|
|
2016-08-23 23:47:40 +02:00
|
|
|
namespace Amp\Parallel\Worker;
|
2016-08-18 18:04:48 +02:00
|
|
|
|
2017-12-08 04:26:55 +01:00
|
|
|
use Amp\Parallel\Context\StatusError;
|
2017-05-18 09:51:31 +02:00
|
|
|
use Amp\Promise;
|
2015-12-05 06:50:32 +01:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Provides a pool of workers that can be used to execute multiple tasks asynchronously.
|
|
|
|
*
|
|
|
|
* A worker pool is a collection of worker threads that can perform multiple
|
|
|
|
* tasks simultaneously. The load on each worker is balanced such that tasks
|
|
|
|
* are completed as soon as possible and workers are used efficiently.
|
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
class DefaultPool implements Pool
|
|
|
|
{
|
2016-08-26 17:10:03 +02:00
|
|
|
/** @var bool Indicates if the pool is currently running. */
|
2017-12-13 21:14:31 +01:00
|
|
|
private $running = true;
|
2015-12-05 06:50:32 +01:00
|
|
|
|
2016-08-26 17:10:03 +02:00
|
|
|
/** @var int The maximum number of workers the pool should spawn. */
|
2015-12-05 06:50:32 +01:00
|
|
|
private $maxSize;
|
|
|
|
|
2016-08-26 17:10:03 +02:00
|
|
|
/** @var WorkerFactory A worker factory to be used to create new workers. */
|
2015-12-05 06:50:32 +01:00
|
|
|
private $factory;
|
|
|
|
|
2016-08-26 17:10:03 +02:00
|
|
|
/** @var \SplObjectStorage A collection of all workers in the pool. */
|
2015-12-12 05:47:46 +01:00
|
|
|
private $workers;
|
2015-12-05 06:50:32 +01:00
|
|
|
|
2016-08-26 17:10:03 +02:00
|
|
|
/** @var \SplQueue A collection of idle workers. */
|
2015-12-05 06:50:32 +01:00
|
|
|
private $idleWorkers;
|
|
|
|
|
2016-08-26 17:10:03 +02:00
|
|
|
/** @var \SplQueue A queue of workers that have been assigned to tasks. */
|
2015-12-05 06:50:32 +01:00
|
|
|
private $busyQueue;
|
|
|
|
|
2016-08-26 17:10:03 +02:00
|
|
|
/** @var \Closure */
|
2016-01-15 01:54:53 +01:00
|
|
|
private $push;
|
|
|
|
|
2015-12-05 06:50:32 +01:00
|
|
|
/**
|
|
|
|
* Creates a new worker pool.
|
|
|
|
*
|
2017-12-13 04:39:51 +01:00
|
|
|
* @param int $maxSize The maximum number of workers the pool should spawn.
|
2015-12-16 23:39:25 +01:00
|
|
|
* Defaults to `Pool::DEFAULT_MAX_SIZE`.
|
2016-08-23 23:47:40 +02:00
|
|
|
* @param \Amp\Parallel\Worker\WorkerFactory|null $factory A worker factory to be used to create
|
2015-12-05 06:50:32 +01:00
|
|
|
* new workers.
|
|
|
|
*
|
2016-08-18 18:04:48 +02:00
|
|
|
* @throws \Error
|
2015-12-05 06:50:32 +01:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function __construct(int $maxSize = self::DEFAULT_MAX_SIZE, WorkerFactory $factory = null)
|
|
|
|
{
|
2017-12-13 21:14:31 +01:00
|
|
|
if ($maxSize < 0) {
|
|
|
|
throw new \Error("Maximum size must be a non-negative integer");
|
2015-12-05 06:50:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
$this->maxSize = $maxSize;
|
|
|
|
|
2016-01-11 16:32:06 +01:00
|
|
|
// Use the global factory if none is given.
|
|
|
|
$this->factory = $factory ?: factory();
|
2015-12-05 06:50:32 +01:00
|
|
|
|
2016-08-18 18:04:48 +02:00
|
|
|
$this->workers = new \SplObjectStorage;
|
|
|
|
$this->idleWorkers = new \SplQueue;
|
|
|
|
$this->busyQueue = new \SplQueue;
|
2016-01-15 01:54:53 +01:00
|
|
|
|
2017-12-24 23:49:23 +01:00
|
|
|
$workers = $this->workers;
|
|
|
|
$idleWorkers = $this->idleWorkers;
|
|
|
|
$busyQueue = $this->busyQueue;
|
2017-12-23 23:18:09 +01:00
|
|
|
|
2017-12-24 23:49:23 +01:00
|
|
|
$this->push = static function (Worker $worker) use ($workers, $idleWorkers, $busyQueue) {
|
2017-12-23 23:18:09 +01:00
|
|
|
\assert($workers->contains($worker), "The provided worker was not part of this queue");
|
|
|
|
|
|
|
|
if (($workers[$worker] -= 1) === 0) {
|
|
|
|
// Worker is completely idle, remove from busy queue and add to idle queue.
|
|
|
|
foreach ($busyQueue as $key => $busy) {
|
|
|
|
if ($busy === $worker) {
|
|
|
|
unset($busyQueue[$key]);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
$idleWorkers->push($worker);
|
|
|
|
}
|
|
|
|
};
|
2015-12-05 06:50:32 +01:00
|
|
|
}
|
|
|
|
|
2018-10-07 17:16:49 +02:00
|
|
|
public function __destruct()
|
|
|
|
{
|
|
|
|
if ($this->isRunning()) {
|
|
|
|
$this->kill();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-12-05 06:50:32 +01:00
|
|
|
/**
|
|
|
|
* Checks if the pool is running.
|
|
|
|
*
|
|
|
|
* @return bool True if the pool is running, otherwise false.
|
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function isRunning(): bool
|
|
|
|
{
|
2015-12-05 06:50:32 +01:00
|
|
|
return $this->running;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Checks if the pool has any idle workers.
|
|
|
|
*
|
|
|
|
* @return bool True if the pool has at least one idle worker, otherwise false.
|
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function isIdle(): bool
|
|
|
|
{
|
2017-12-13 21:14:31 +01:00
|
|
|
return $this->idleWorkers->count() > 0 || $this->workers->count() === 0;
|
2015-12-05 06:50:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2015-12-16 22:53:53 +01:00
|
|
|
* {@inheritdoc}
|
2015-12-05 06:50:32 +01:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function getMaxSize(): int
|
|
|
|
{
|
2015-12-05 06:50:32 +01:00
|
|
|
return $this->maxSize;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function getWorkerCount(): int
|
|
|
|
{
|
2015-12-16 22:53:53 +01:00
|
|
|
return $this->workers->count();
|
2015-12-05 06:50:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function getIdleWorkerCount(): int
|
|
|
|
{
|
2015-12-05 06:50:32 +01:00
|
|
|
return $this->idleWorkers->count();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Enqueues a task to be executed by the worker pool.
|
|
|
|
*
|
|
|
|
* @param Task $task The task to enqueue.
|
|
|
|
*
|
2017-03-16 23:03:59 +01:00
|
|
|
* @return \Amp\Promise<mixed> The return value of Task::run().
|
2015-12-05 06:50:32 +01:00
|
|
|
*
|
2017-12-13 21:14:31 +01:00
|
|
|
* @throws \Amp\Parallel\Context\StatusError If the pool has been shutdown.
|
2017-07-18 05:53:19 +02:00
|
|
|
* @throws \Amp\Parallel\Worker\TaskException If the task throws an exception.
|
2015-12-05 06:50:32 +01:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function enqueue(Task $task): Promise
|
|
|
|
{
|
2017-12-08 03:49:44 +01:00
|
|
|
$worker = $this->pull();
|
2017-05-18 09:51:31 +02:00
|
|
|
|
2017-12-08 03:49:44 +01:00
|
|
|
$promise = $worker->enqueue($task);
|
|
|
|
$promise->onResolve(function () use ($worker) {
|
2017-12-23 23:18:09 +01:00
|
|
|
($this->push)($worker);
|
2017-12-08 03:49:44 +01:00
|
|
|
});
|
|
|
|
return $promise;
|
2015-12-05 06:50:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Shuts down the pool and all workers in it.
|
|
|
|
*
|
2017-03-16 23:03:59 +01:00
|
|
|
* @return \Amp\Promise<int[]> Array of exit status from all workers.
|
2015-12-05 06:50:32 +01:00
|
|
|
*
|
2017-12-08 04:26:55 +01:00
|
|
|
* @throws \Amp\Parallel\Context\StatusError If the pool has not been started.
|
2015-12-05 06:50:32 +01:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function shutdown(): Promise
|
|
|
|
{
|
2015-12-05 06:50:32 +01:00
|
|
|
if (!$this->isRunning()) {
|
2017-12-13 21:14:31 +01:00
|
|
|
throw new StatusError("The pool was shutdown");
|
2015-12-05 06:50:32 +01:00
|
|
|
}
|
2017-05-18 09:51:31 +02:00
|
|
|
|
2016-01-15 05:13:38 +01:00
|
|
|
$this->running = false;
|
2015-12-05 06:50:32 +01:00
|
|
|
|
2015-12-12 05:47:46 +01:00
|
|
|
$shutdowns = [];
|
|
|
|
foreach ($this->workers as $worker) {
|
2015-12-16 22:53:53 +01:00
|
|
|
if ($worker->isRunning()) {
|
2016-08-18 18:04:48 +02:00
|
|
|
$shutdowns[] = $worker->shutdown();
|
2015-12-16 22:53:53 +01:00
|
|
|
}
|
2015-12-12 05:47:46 +01:00
|
|
|
}
|
2015-12-05 06:50:32 +01:00
|
|
|
|
2017-12-08 03:49:44 +01:00
|
|
|
return Promise\all($shutdowns);
|
2015-12-05 06:50:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Kills all workers in the pool and halts the worker pool.
|
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function kill()
|
|
|
|
{
|
2016-01-15 05:13:38 +01:00
|
|
|
$this->running = false;
|
2015-12-05 06:50:32 +01:00
|
|
|
|
|
|
|
foreach ($this->workers as $worker) {
|
|
|
|
$worker->kill();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Creates a worker and adds them to the pool.
|
|
|
|
*
|
|
|
|
* @return Worker The worker created.
|
|
|
|
*/
|
2018-10-08 18:24:46 +02:00
|
|
|
private function createWorker(): Worker
|
2018-10-07 16:50:45 +02:00
|
|
|
{
|
2015-12-05 06:50:32 +01:00
|
|
|
$worker = $this->factory->create();
|
2016-01-15 01:54:53 +01:00
|
|
|
$this->workers->attach($worker, 0);
|
2015-12-05 06:50:32 +01:00
|
|
|
return $worker;
|
|
|
|
}
|
2016-01-15 01:54:53 +01:00
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function get(): Worker
|
|
|
|
{
|
2016-08-31 00:36:21 +02:00
|
|
|
return new Internal\PooledWorker($this->pull(), $this->push);
|
|
|
|
}
|
2017-05-18 09:51:31 +02:00
|
|
|
|
2016-08-31 00:36:21 +02:00
|
|
|
/**
|
|
|
|
* Pulls a worker from the pool. The worker should be put back into the pool with push() to be marked as idle.
|
|
|
|
*
|
|
|
|
* @return \Amp\Parallel\Worker\Worker
|
2017-12-08 04:26:55 +01:00
|
|
|
* @throws \Amp\Parallel\Context\StatusError
|
2016-08-31 00:36:21 +02:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
protected function pull(): Worker
|
|
|
|
{
|
2016-01-15 01:54:53 +01:00
|
|
|
if (!$this->isRunning()) {
|
2017-12-13 21:14:31 +01:00
|
|
|
throw new StatusError("The pool was shutdown");
|
2016-01-15 01:54:53 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
do {
|
|
|
|
if ($this->idleWorkers->isEmpty()) {
|
|
|
|
if ($this->getWorkerCount() >= $this->maxSize) {
|
|
|
|
// All possible workers busy, so shift from head (will be pushed back onto tail below).
|
|
|
|
$worker = $this->busyQueue->shift();
|
|
|
|
} else {
|
|
|
|
// Max worker count has not been reached, so create another worker.
|
|
|
|
$worker = $this->createWorker();
|
2017-12-13 21:14:31 +01:00
|
|
|
break;
|
2016-01-15 01:54:53 +01:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Shift a worker off the idle queue.
|
|
|
|
$worker = $this->idleWorkers->shift();
|
|
|
|
}
|
2016-01-15 06:37:02 +01:00
|
|
|
|
|
|
|
if ($worker->isRunning()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
$this->workers->detach($worker);
|
|
|
|
} while (true);
|
2016-01-15 01:54:53 +01:00
|
|
|
|
|
|
|
$this->busyQueue->push($worker);
|
|
|
|
$this->workers[$worker] += 1;
|
2017-05-18 09:51:31 +02:00
|
|
|
|
2016-08-31 00:36:21 +02:00
|
|
|
return $worker;
|
2016-01-15 01:54:53 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2018-02-27 23:26:51 +01:00
|
|
|
* Pushes the worker back into the pool and mark it as idle.
|
2016-01-15 01:54:53 +01:00
|
|
|
*
|
2016-08-23 23:47:40 +02:00
|
|
|
* @param \Amp\Parallel\Worker\Worker $worker
|
2016-01-15 01:54:53 +01:00
|
|
|
*
|
2016-08-18 18:04:48 +02:00
|
|
|
* @throws \Error If the worker was not part of this queue.
|
2016-01-15 01:54:53 +01:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
protected function push(Worker $worker)
|
|
|
|
{
|
2017-12-23 23:18:09 +01:00
|
|
|
($this->push)($worker); // Kept for BC
|
2016-01-15 01:54:53 +01:00
|
|
|
}
|
2015-12-05 06:50:32 +01:00
|
|
|
}
|