maxSize = $maxSize; $this->minSize = $minSize; // Create the default factory if none is given. $this->factory = $factory ?: new DefaultWorkerFactory(); $this->workers = new \SplObjectStorage(); $this->idleWorkers = new \SplQueue(); $this->busyQueue = new \SplQueue(); } /** * Checks if the pool is running. * * @return bool True if the pool is running, otherwise false. */ public function isRunning() { return $this->running; } /** * Checks if the pool has any idle workers. * * @return bool True if the pool has at least one idle worker, otherwise false. */ public function isIdle() { return $this->idleWorkers->count() > 0; } /** * Gets the minimum number of workers the pool may have idle. * * @return int The minimum number of workers. */ public function getMinSize() { return $this->minSize; } /** * Gets the maximum number of workers the pool may spawn to handle concurrent tasks. * * @return int The maximum number of workers. */ public function getMaxSize() { return $this->maxSize; } /** * {@inheritdoc} */ public function getWorkerCount() { return count($this->workers); } /** * {@inheritdoc} */ public function getIdleWorkerCount() { return $this->idleWorkers->count(); } /** * Starts the worker pool execution. * * When the worker pool starts up, the minimum number of workers will be created. This adds some overhead to * starting the pool, but allows for greater performance during runtime. */ public function start() { if ($this->isRunning()) { throw new StatusError('The worker pool has already been started.'); } // Start up the pool with the minimum number of workers. $count = $this->minSize; while (--$count >= 0) { $worker = $this->createWorker(); $this->idleWorkers->enqueue($worker); } $this->running = true; } /** * Enqueues a task to be executed by the worker pool. * * @coroutine * * @param Task $task The task to enqueue. * * @return \Generator * * @resolve mixed The return value of the task. * * @throws \Icicle\Concurrent\Exception\StatusError If the pool has not been started. * @throws \Icicle\Concurrent\Exception\TaskException If the task throws an exception. */ public function enqueue(Task $task) { if (!$this->isRunning()) { throw new StatusError('The worker pool has not been started.'); } // Find a free worker if one is available. if (!$this->idleWorkers->isEmpty()) { $worker = $this->idleWorkers->dequeue(); } elseif ($this->getWorkerCount() < $this->maxSize) { // We are allowed to spawn another worker if needed, so do so. $worker = $this->createWorker(); } else { // We have no other choice but to wait for a worker to be freed up. $delayed = new Delayed(); $this->busyQueue->enqueue($delayed); $worker = (yield $delayed); } try { $result = (yield $worker->enqueue($task)); } finally { if (!$worker->isRunning()) { // Worker crashed, discard it and spin up a new worker. $this->workers->detach($worker); $worker = $this->createWorker(); } // If someone is waiting on a worker, give it to them instead. if (!$this->busyQueue->isEmpty()) { /** @var \Icicle\Awaitable\Delayed $delayed */ $delayed = $this->busyQueue->dequeue(); if ($delayed->isPending()) { $delayed->resolve($worker); } } else { // No tasks are waiting, so add the worker to the idle queue. $this->idleWorkers->enqueue($worker); } } yield $result; } /** * Shuts down the pool and all workers in it. * * @coroutine * * @return \Generator * * @throws \Icicle\Concurrent\Exception\StatusError If the pool has not been started. */ public function shutdown() { if (!$this->isRunning()) { throw new StatusError('The pool is not running.'); } $this->close(); $shutdowns = []; foreach ($this->workers as $worker) { $shutdowns[] = new Coroutine($worker->shutdown()); } yield Awaitable\reduce($shutdowns, function ($carry, $value) { return $carry ?: $value; }, 0); } /** * Kills all workers in the pool and halts the worker pool. */ public function kill() { $this->close(); foreach ($this->workers as $worker) { $worker->kill(); } } /** * Rejects any queued tasks. */ private function close() { $this->running = false; $exception = new WorkerException('Worker pool was shut down.'); while (!$this->busyQueue->isEmpty()) { /** @var \Icicle\Awaitable\Delayed $delayed */ $delayed = $this->busyQueue->dequeue(); $delayed->cancel($exception); } } /** * Creates a worker and adds them to the pool. * * @return Worker The worker created. */ private function createWorker() { $worker = $this->factory->create(); $worker->start(); $this->workers->attach($worker); return $worker; } }