maxSize = $maxSize; $this->minSize = $minSize; // Use the global factory if none is given. $this->factory = $factory ?: factory(); $this->workers = new \SplObjectStorage(); $this->idleWorkers = new \SplQueue(); $this->busyQueue = new \SplQueue(); $this->push = function (Worker $worker) { $this->push($worker); }; } /** * Checks if the pool is running. * * @return bool True if the pool is running, otherwise false. */ public function isRunning(): bool { return $this->running; } /** * Checks if the pool has any idle workers. * * @return bool True if the pool has at least one idle worker, otherwise false. */ public function isIdle(): bool { return $this->idleWorkers->count() > 0; } /** * {@inheritdoc} */ public function getMinSize(): int { return $this->minSize; } /** * {@inheritdoc} */ public function getMaxSize(): int { return $this->maxSize; } /** * {@inheritdoc} */ public function getWorkerCount(): int { return $this->workers->count(); } /** * {@inheritdoc} */ public function getIdleWorkerCount(): int { return $this->idleWorkers->count(); } /** * Starts the worker pool execution. * * When the worker pool starts up, the minimum number of workers will be created. This adds some overhead to * starting the pool, but allows for greater performance during runtime. */ public function start() { if ($this->isRunning()) { throw new StatusError('The worker pool has already been started.'); } // Start up the pool with the minimum number of workers. $count = $this->minSize; while (--$count >= 0) { $worker = $this->createWorker(); $this->idleWorkers->enqueue($worker); } $this->running = true; } /** * Enqueues a task to be executed by the worker pool. * * @coroutine * * @param Task $task The task to enqueue. * * @return \Generator * * @resolve mixed The return value of the task. * * @throws \Icicle\Concurrent\Exception\StatusError If the pool has not been started. * @throws \Icicle\Concurrent\Exception\TaskException If the task throws an exception. */ public function enqueue(Task $task): \Generator { $worker = $this->get(); return yield from $worker->enqueue($task); } /** * Shuts down the pool and all workers in it. * * @coroutine * * @return \Generator * * @throws \Icicle\Concurrent\Exception\StatusError If the pool has not been started. */ public function shutdown(): \Generator { if (!$this->isRunning()) { throw new StatusError('The pool is not running.'); } $this->running = false; $shutdowns = []; foreach ($this->workers as $worker) { if ($worker->isRunning()) { $shutdowns[] = new Coroutine($worker->shutdown()); } } return yield Awaitable\reduce($shutdowns, function ($carry, $value) { return $carry ?: $value; }, 0); } /** * Kills all workers in the pool and halts the worker pool. */ public function kill() { $this->running = false; foreach ($this->workers as $worker) { $worker->kill(); } } /** * Creates a worker and adds them to the pool. * * @return Worker The worker created. */ private function createWorker() { $worker = $this->factory->create(); $worker->start(); $this->workers->attach($worker, 0); return $worker; } /** * {@inheritdoc} */ public function get(): Worker { if (!$this->isRunning()) { throw new StatusError('The queue is not running.'); } do { if ($this->idleWorkers->isEmpty()) { if ($this->getWorkerCount() >= $this->maxSize) { // All possible workers busy, so shift from head (will be pushed back onto tail below). $worker = $this->busyQueue->shift(); } else { // Max worker count has not been reached, so create another worker. $worker = $this->createWorker(); } } else { // Shift a worker off the idle queue. $worker = $this->idleWorkers->shift(); } if ($worker->isRunning()) { break; } $this->workers->detach($worker); } while (true); $this->busyQueue->push($worker); $this->workers[$worker] += 1; return new Internal\PooledWorker($worker, $this->push); } /** * Pushes the worker back into the queue. * * @param \Icicle\Concurrent\Worker\Worker $worker * * @throws \Icicle\Exception\InvalidArgumentError If the worker was not part of this queue. */ private function push(Worker $worker) { if (!$this->workers->contains($worker)) { throw new InvalidArgumentError( 'The provided worker was not part of this queue.' ); } if (0 === ($this->workers[$worker] -= 1)) { // Worker is completely idle, remove from busy queue and add to idle queue. foreach ($this->busyQueue as $key => $busy) { if ($busy === $worker) { unset($this->busyQueue[$key]); break; } } $this->idleWorkers->push($worker); } } }