1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Refactor DefaultPool

This commit is contained in:
Aaron Piotrowski 2016-08-30 17:36:21 -05:00
parent 3d2964ff10
commit 046f7defb8

View File

@ -2,8 +2,7 @@
namespace Amp\Parallel\Worker;
use Amp;
use Amp\Coroutine;
use Amp\{ CallableMaker, Coroutine };
use Amp\Parallel\StatusError;
use Interop\Async\Awaitable;
@ -15,6 +14,8 @@ use Interop\Async\Awaitable;
* are completed as soon as possible and workers are used efficiently.
*/
class DefaultPool implements Pool {
use CallableMaker;
/** @var bool Indicates if the pool is currently running. */
private $running = false;
@ -73,13 +74,7 @@ class DefaultPool implements Pool {
$this->idleWorkers = new \SplQueue;
$this->busyQueue = new \SplQueue;
if (PHP_VERSION_ID >= 70100) {
$this->push = \Closure::fromCallable([$this, 'push']);
} else {
$this->push = function (Worker $worker) {
$this->push($worker);
};
}
$this->push = $this->callableFromInstanceMethod("push");
}
/**
@ -152,8 +147,6 @@ class DefaultPool implements Pool {
/**
* Enqueues a task to be executed by the worker pool.
*
* @coroutine
*
* @param Task $task The task to enqueue.
*
* @return \Interop\Async\Awaitable<mixed> The return value of Task::run().
@ -162,8 +155,26 @@ class DefaultPool implements Pool {
* @throws \Amp\Parallel\TaskException If the task throws an exception.
*/
public function enqueue(Task $task): Awaitable {
$worker = $this->get();
return $worker->enqueue($task);
return new Coroutine($this->doEnqueue($task));
}
/**
* @coroutine
*
* @param \Amp\Parallel\Worker\Task $task
*
* @return \Generator
*/
public function doEnqueue(Task $task): \Generator {
$worker = $this->pull();
try {
$result = yield $worker->enqueue($task);
} finally {
$this->push($worker);
}
return $result;
}
/**
@ -203,7 +214,7 @@ class DefaultPool implements Pool {
}
}
return yield Amp\all($shutdowns);
return yield \Amp\all($shutdowns);
}
/**
@ -234,6 +245,16 @@ class DefaultPool implements Pool {
* {@inheritdoc}
*/
public function get(): Worker {
return new Internal\PooledWorker($this->pull(), $this->push);
}
/**
* Pulls a worker from the pool. The worker should be put back into the pool with push() to be marked as idle.
*
* @return \Amp\Parallel\Worker\Worker
* @throws \Amp\Parallel\StatusError
*/
protected function pull(): Worker {
if (!$this->isRunning()) {
throw new StatusError("The queue is not running");
}
@ -261,8 +282,8 @@ class DefaultPool implements Pool {
$this->busyQueue->push($worker);
$this->workers[$worker] += 1;
return new Internal\PooledWorker($worker, $this->push);
return $worker;
}
/**
@ -272,7 +293,7 @@ class DefaultPool implements Pool {
*
* @throws \Error If the worker was not part of this queue.
*/
private function push(Worker $worker) {
protected function push(Worker $worker) {
if (!$this->workers->contains($worker)) {
throw new \Error("The provided worker was not part of this queue");
}