1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-23 06:21:12 +01:00
parallel/lib/Worker/DefaultPool.php

256 lines
6.8 KiB
PHP
Raw Normal View History

2016-12-29 19:16:04 -06:00
<?php
2015-12-04 23:50:32 -06:00
2016-08-23 16:47:40 -05:00
namespace Amp\Parallel\Worker;
2016-08-18 11:04:48 -05:00
2017-12-07 21:26:55 -06:00
use Amp\Parallel\Context\StatusError;
2017-05-18 09:51:31 +02:00
use Amp\Promise;
2015-12-04 23:50:32 -06:00
/**
* Provides a pool of workers that can be used to execute multiple tasks asynchronously.
*
* A worker pool is a collection of worker threads that can perform multiple
* tasks simultaneously. The load on each worker is balanced such that tasks
* are completed as soon as possible and workers are used efficiently.
*/
2018-10-21 10:54:46 -05:00
final class DefaultPool implements Pool
2018-10-07 09:50:45 -05:00
{
2016-08-26 10:10:03 -05:00
/** @var bool Indicates if the pool is currently running. */
private $running = true;
2015-12-04 23:50:32 -06:00
2016-08-26 10:10:03 -05:00
/** @var int The maximum number of workers the pool should spawn. */
2015-12-04 23:50:32 -06:00
private $maxSize;
2016-08-26 10:10:03 -05:00
/** @var WorkerFactory A worker factory to be used to create new workers. */
2015-12-04 23:50:32 -06:00
private $factory;
2016-08-26 10:10:03 -05:00
/** @var \SplObjectStorage A collection of all workers in the pool. */
private $workers;
2015-12-04 23:50:32 -06:00
2016-08-26 10:10:03 -05:00
/** @var \SplQueue A collection of idle workers. */
2015-12-04 23:50:32 -06:00
private $idleWorkers;
2016-08-26 10:10:03 -05:00
/** @var \SplQueue A queue of workers that have been assigned to tasks. */
2015-12-04 23:50:32 -06:00
private $busyQueue;
2016-08-26 10:10:03 -05:00
/** @var \Closure */
private $push;
/** @var \Amp\Promise|null */
private $exitStatus;
2015-12-04 23:50:32 -06:00
/**
* Creates a new worker pool.
*
2017-12-12 21:39:51 -06:00
* @param int $maxSize The maximum number of workers the pool should spawn.
2015-12-16 16:39:25 -06:00
* Defaults to `Pool::DEFAULT_MAX_SIZE`.
2016-08-23 16:47:40 -05:00
* @param \Amp\Parallel\Worker\WorkerFactory|null $factory A worker factory to be used to create
2015-12-04 23:50:32 -06:00
* new workers.
*
2016-08-18 11:04:48 -05:00
* @throws \Error
2015-12-04 23:50:32 -06:00
*/
2018-10-07 09:50:45 -05:00
public function __construct(int $maxSize = self::DEFAULT_MAX_SIZE, WorkerFactory $factory = null)
{
if ($maxSize < 0) {
throw new \Error("Maximum size must be a non-negative integer");
2015-12-04 23:50:32 -06:00
}
$this->maxSize = $maxSize;
2016-01-11 09:32:06 -06:00
// Use the global factory if none is given.
$this->factory = $factory ?: factory();
2015-12-04 23:50:32 -06:00
2016-08-18 11:04:48 -05:00
$this->workers = new \SplObjectStorage;
$this->idleWorkers = new \SplQueue;
$this->busyQueue = new \SplQueue;
$workers = $this->workers;
$idleWorkers = $this->idleWorkers;
$busyQueue = $this->busyQueue;
$this->push = static function (Worker $worker) use ($workers, $idleWorkers, $busyQueue) {
\assert($workers->contains($worker), "The provided worker was not part of this queue");
if (($workers[$worker] -= 1) === 0) {
// Worker is completely idle, remove from busy queue and add to idle queue.
foreach ($busyQueue as $key => $busy) {
if ($busy === $worker) {
unset($busyQueue[$key]);
break;
}
}
$idleWorkers->push($worker);
}
};
2015-12-04 23:50:32 -06:00
}
2018-10-07 10:16:49 -05:00
public function __destruct()
{
if ($this->isRunning()) {
$this->kill();
}
}
2015-12-04 23:50:32 -06:00
/**
* Checks if the pool is running.
*
* @return bool True if the pool is running, otherwise false.
*/
2018-10-07 09:50:45 -05:00
public function isRunning(): bool
{
2015-12-04 23:50:32 -06:00
return $this->running;
}
/**
* Checks if the pool has any idle workers.
*
* @return bool True if the pool has at least one idle worker, otherwise false.
*/
2018-10-07 09:50:45 -05:00
public function isIdle(): bool
{
return $this->idleWorkers->count() > 0 || $this->workers->count() === 0;
2015-12-04 23:50:32 -06:00
}
/**
2015-12-16 15:53:53 -06:00
* {@inheritdoc}
2015-12-04 23:50:32 -06:00
*/
2018-10-07 09:50:45 -05:00
public function getMaxSize(): int
{
2015-12-04 23:50:32 -06:00
return $this->maxSize;
}
/**
* {@inheritdoc}
*/
2018-10-07 09:50:45 -05:00
public function getWorkerCount(): int
{
2015-12-16 15:53:53 -06:00
return $this->workers->count();
2015-12-04 23:50:32 -06:00
}
/**
* {@inheritdoc}
*/
2018-10-07 09:50:45 -05:00
public function getIdleWorkerCount(): int
{
2015-12-04 23:50:32 -06:00
return $this->idleWorkers->count();
}
/**
* Enqueues a task to be executed by the worker pool.
*
* @param Task $task The task to enqueue.
*
* @return \Amp\Promise<mixed> The return value of Task::run().
2015-12-04 23:50:32 -06:00
*
* @throws \Amp\Parallel\Context\StatusError If the pool has been shutdown.
2017-07-17 22:53:19 -05:00
* @throws \Amp\Parallel\Worker\TaskException If the task throws an exception.
2015-12-04 23:50:32 -06:00
*/
2018-10-07 09:50:45 -05:00
public function enqueue(Task $task): Promise
{
$worker = $this->pull();
2017-05-18 09:51:31 +02:00
$promise = $worker->enqueue($task);
$promise->onResolve(function () use ($worker) {
($this->push)($worker);
});
return $promise;
2015-12-04 23:50:32 -06:00
}
/**
* Shuts down the pool and all workers in it.
*
* @return \Amp\Promise<int[]> Array of exit status from all workers.
2015-12-04 23:50:32 -06:00
*
2017-12-07 21:26:55 -06:00
* @throws \Amp\Parallel\Context\StatusError If the pool has not been started.
2015-12-04 23:50:32 -06:00
*/
2018-10-07 09:50:45 -05:00
public function shutdown(): Promise
{
if ($this->exitStatus) {
return $this->exitStatus;
2015-12-04 23:50:32 -06:00
}
2017-05-18 09:51:31 +02:00
$this->running = false;
2015-12-04 23:50:32 -06:00
$shutdowns = [];
foreach ($this->workers as $worker) {
2015-12-16 15:53:53 -06:00
if ($worker->isRunning()) {
2016-08-18 11:04:48 -05:00
$shutdowns[] = $worker->shutdown();
2015-12-16 15:53:53 -06:00
}
}
2015-12-04 23:50:32 -06:00
return $this->exitStatus = Promise\all($shutdowns);
2015-12-04 23:50:32 -06:00
}
/**
* Kills all workers in the pool and halts the worker pool.
*/
2018-10-07 09:50:45 -05:00
public function kill()
{
$this->running = false;
2015-12-04 23:50:32 -06:00
foreach ($this->workers as $worker) {
$worker->kill();
}
}
/**
* Creates a worker and adds them to the pool.
*
* @return Worker The worker created.
*/
private function createWorker(): Worker
2018-10-07 09:50:45 -05:00
{
2015-12-04 23:50:32 -06:00
$worker = $this->factory->create();
$this->workers->attach($worker, 0);
2015-12-04 23:50:32 -06:00
return $worker;
}
/**
* {@inheritdoc}
*/
public function getWorker(): Worker
2018-10-07 09:50:45 -05:00
{
2016-08-30 17:36:21 -05:00
return new Internal\PooledWorker($this->pull(), $this->push);
}
2017-05-18 09:51:31 +02:00
2016-08-30 17:36:21 -05:00
/**
* Pulls a worker from the pool.
2016-08-30 17:36:21 -05:00
*
* @return \Amp\Parallel\Worker\Worker
2017-12-07 21:26:55 -06:00
* @throws \Amp\Parallel\Context\StatusError
2016-08-30 17:36:21 -05:00
*/
private function pull(): Worker
2018-10-07 09:50:45 -05:00
{
if (!$this->isRunning()) {
throw new StatusError("The pool was shutdown");
}
do {
if ($this->idleWorkers->isEmpty()) {
if ($this->getWorkerCount() >= $this->maxSize) {
// All possible workers busy, so shift from head (will be pushed back onto tail below).
$worker = $this->busyQueue->shift();
} else {
// Max worker count has not been reached, so create another worker.
$worker = $this->createWorker();
break;
}
} else {
// Shift a worker off the idle queue.
$worker = $this->idleWorkers->shift();
}
if ($worker->isRunning()) {
break;
}
$this->workers->detach($worker);
} while (true);
$this->busyQueue->push($worker);
$this->workers[$worker] += 1;
2017-05-18 09:51:31 +02:00
2016-08-30 17:36:21 -05:00
return $worker;
}
2015-12-04 23:50:32 -06:00
}