maxSize = $maxSize; $this->minSize = $minSize; // Use the global factory if none is given. $this->factory = $factory ?: factory(); $this->workers = new \SplObjectStorage; $this->idleWorkers = new \SplQueue; $this->busyQueue = new \SplQueue; $this->push = $this->callableFromInstanceMethod("push"); } /** * Checks if the pool is running. * * @return bool True if the pool is running, otherwise false. */ public function isRunning(): bool { return $this->running; } /** * Checks if the pool has any idle workers. * * @return bool True if the pool has at least one idle worker, otherwise false. */ public function isIdle(): bool { return $this->idleWorkers->count() > 0; } /** * {@inheritdoc} */ public function getMinSize(): int { return $this->minSize; } /** * {@inheritdoc} */ public function getMaxSize(): int { return $this->maxSize; } /** * {@inheritdoc} */ public function getWorkerCount(): int { return $this->workers->count(); } /** * {@inheritdoc} */ public function getIdleWorkerCount(): int { return $this->idleWorkers->count(); } /** * Starts the worker pool execution. * * When the worker pool starts up, the minimum number of workers will be created. This adds some overhead to * starting the pool, but allows for greater performance during runtime. */ public function start() { if ($this->isRunning()) { throw new StatusError('The worker pool has already been started.'); } // Start up the pool with the minimum number of workers. $count = $this->minSize; while (--$count >= 0) { $worker = $this->createWorker(); $this->idleWorkers->enqueue($worker); } $this->running = true; } /** * Enqueues a task to be executed by the worker pool. * * @param Task $task The task to enqueue. * * @return \Interop\Async\Awaitable The return value of Task::run(). * * @throws \Amp\Parallel\StatusError If the pool has not been started. * @throws \Amp\Parallel\TaskException If the task throws an exception. */ public function enqueue(Task $task): Awaitable { return new Coroutine($this->doEnqueue($this->pull(), $task)); } /** * @coroutine * * Keeps the worker marked as busy until the task has completed. * * @param \Amp\Parallel\Worker\Worker $worker * @param \Amp\Parallel\Worker\Task $task * * @return \Generator */ public function doEnqueue(Worker $worker, Task $task): \Generator { try { $result = yield $worker->enqueue($task); } finally { $this->push($worker); } return $result; } /** * Shuts down the pool and all workers in it. * * @coroutine * * @return \Interop\Async\Awaitable Array of exit status from all workers. * * @throws \Amp\Parallel\StatusError If the pool has not been started. */ public function shutdown(): Awaitable { if (!$this->isRunning()) { throw new StatusError('The pool is not running.'); } return new Coroutine($this->doShutdown()); } /** * Shuts down the pool and all workers in it. * * @coroutine * * @return \Generator * * @throws \Amp\Parallel\StatusError If the pool has not been started. */ private function doShutdown(): \Generator { $this->running = false; $shutdowns = []; foreach ($this->workers as $worker) { if ($worker->isRunning()) { $shutdowns[] = $worker->shutdown(); } } return yield \Amp\all($shutdowns); } /** * Kills all workers in the pool and halts the worker pool. */ public function kill() { $this->running = false; foreach ($this->workers as $worker) { $worker->kill(); } } /** * Creates a worker and adds them to the pool. * * @return Worker The worker created. */ private function createWorker() { $worker = $this->factory->create(); $worker->start(); $this->workers->attach($worker, 0); return $worker; } /** * {@inheritdoc} */ public function get(): Worker { return new Internal\PooledWorker($this->pull(), $this->push); } /** * Pulls a worker from the pool. The worker should be put back into the pool with push() to be marked as idle. * * @return \Amp\Parallel\Worker\Worker * @throws \Amp\Parallel\StatusError */ protected function pull(): Worker { if (!$this->isRunning()) { throw new StatusError("The queue is not running"); } do { if ($this->idleWorkers->isEmpty()) { if ($this->getWorkerCount() >= $this->maxSize) { // All possible workers busy, so shift from head (will be pushed back onto tail below). $worker = $this->busyQueue->shift(); } else { // Max worker count has not been reached, so create another worker. $worker = $this->createWorker(); } } else { // Shift a worker off the idle queue. $worker = $this->idleWorkers->shift(); } if ($worker->isRunning()) { break; } $this->workers->detach($worker); } while (true); $this->busyQueue->push($worker); $this->workers[$worker] += 1; return $worker; } /** * Pushes the worker back into the queue. * * @param \Amp\Parallel\Worker\Worker $worker * * @throws \Error If the worker was not part of this queue. */ protected function push(Worker $worker) { if (!$this->workers->contains($worker)) { throw new \Error("The provided worker was not part of this queue"); } if (($this->workers[$worker] -= 1) === 0) { // Worker is completely idle, remove from busy queue and add to idle queue. foreach ($this->busyQueue as $key => $busy) { if ($busy === $worker) { unset($this->busyQueue[$key]); break; } } $this->idleWorkers->push($worker); } } }