1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Remove Worker::start()

The context can automatically be started when a job is enqueued.
This commit is contained in:
Aaron Piotrowski 2017-12-13 14:14:31 -06:00
parent 2a09f82f39
commit 312aecf1ff
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
12 changed files with 37 additions and 140 deletions

View File

@ -25,7 +25,6 @@ Loop::run(function () use (&$results, &$tasks) {
Loop::unreference($timer);
$pool = new DefaultPool;
$pool->start();
$coroutines = [];

View File

@ -9,7 +9,6 @@ Amp\Loop::run(function () {
$factory = new DefaultWorkerFactory();
$worker = $factory->create();
$worker->start();
$result = yield $worker->enqueue(new BlockingTask('file_get_contents', 'https://google.com'));
printf("Read %d bytes\n", strlen($result));

View File

@ -7,6 +7,7 @@ use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\ContextException;
use Amp\Parallel\Context\StatusError;
use Amp\Promise;
use Amp\Success;
use function Amp\call;
/**
@ -79,25 +80,18 @@ abstract class AbstractWorker implements Worker {
return empty($this->jobQueue);
}
/**
* {@inheritdoc}
*/
public function start() {
$this->context->start();
}
/**
* {@inheritdoc}
*/
public function enqueue(Task $task): Promise {
if (!$this->context->isRunning()) {
throw new StatusError("The worker has not been started");
}
if ($this->shutdown) {
throw new StatusError("The worker has been shut down");
}
if (!$this->context->isRunning()) {
$this->context->start();
}
$empty = empty($this->jobQueue);
$job = new Internal\Job($task);
@ -116,12 +110,16 @@ abstract class AbstractWorker implements Worker {
* {@inheritdoc}
*/
public function shutdown(): Promise {
if (!$this->context->isRunning() || $this->shutdown) {
if ($this->shutdown) {
throw new StatusError("The worker is not running");
}
$this->shutdown = true;
if (!$this->context->isRunning()) {
return new Success(0);
}
return call(function () {
if (!empty($this->jobQueue)) {
// If a task is currently running, wait for it to finish.
@ -158,6 +156,8 @@ abstract class AbstractWorker implements Worker {
$this->jobQueue = [];
}
$this->context->kill();
if ($this->context->isRunning()) {
$this->context->kill();
}
}
}

View File

@ -17,10 +17,7 @@ class DefaultPool implements Pool {
use CallableMaker;
/** @var bool Indicates if the pool is currently running. */
private $running = false;
/** @var int The minimum number of workers the pool should spawn. */
private $minSize;
private $running = true;
/** @var int The maximum number of workers the pool should spawn. */
private $maxSize;
@ -43,8 +40,6 @@ class DefaultPool implements Pool {
/**
* Creates a new worker pool.
*
* @param int $minSize The minimum number of workers the pool should spawn.
* Defaults to `Pool::DEFAULT_MIN_SIZE`.
* @param int $maxSize The maximum number of workers the pool should spawn.
* Defaults to `Pool::DEFAULT_MAX_SIZE`.
* @param \Amp\Parallel\Worker\WorkerFactory|null $factory A worker factory to be used to create
@ -52,21 +47,12 @@ class DefaultPool implements Pool {
*
* @throws \Error
*/
public function __construct(
int $minSize = self::DEFAULT_MIN_SIZE,
int $maxSize = self::DEFAULT_MAX_SIZE,
WorkerFactory $factory = null
) {
if ($minSize < 0) {
throw new \Error('Minimum size must be a non-negative integer.');
}
if ($maxSize < 0 || $maxSize < $minSize) {
throw new \Error('Maximum size must be a non-negative integer at least '.$minSize.'.');
public function __construct(int $maxSize = self::DEFAULT_MAX_SIZE, WorkerFactory $factory = null) {
if ($maxSize < 0) {
throw new \Error("Maximum size must be a non-negative integer");
}
$this->maxSize = $maxSize;
$this->minSize = $minSize;
// Use the global factory if none is given.
$this->factory = $factory ?: factory();
@ -93,14 +79,7 @@ class DefaultPool implements Pool {
* @return bool True if the pool has at least one idle worker, otherwise false.
*/
public function isIdle(): bool {
return $this->idleWorkers->count() > 0;
}
/**
* {@inheritdoc}
*/
public function getMinSize(): int {
return $this->minSize;
return $this->idleWorkers->count() > 0 || $this->workers->count() === 0;
}
/**
@ -124,27 +103,6 @@ class DefaultPool implements Pool {
return $this->idleWorkers->count();
}
/**
* Starts the worker pool execution.
*
* When the worker pool starts up, the minimum number of workers will be created. This adds some overhead to
* starting the pool, but allows for greater performance during runtime.
*/
public function start() {
if ($this->isRunning()) {
throw new StatusError('The worker pool has already been started.');
}
// Start up the pool with the minimum number of workers.
$count = $this->minSize;
while (--$count >= 0) {
$worker = $this->createWorker();
$this->idleWorkers->enqueue($worker);
}
$this->running = true;
}
/**
* Enqueues a task to be executed by the worker pool.
*
@ -152,7 +110,7 @@ class DefaultPool implements Pool {
*
* @return \Amp\Promise<mixed> The return value of Task::run().
*
* @throws \Amp\Parallel\Context\StatusError If the pool has not been started.
* @throws \Amp\Parallel\Context\StatusError If the pool has been shutdown.
* @throws \Amp\Parallel\Worker\TaskException If the task throws an exception.
*/
public function enqueue(Task $task): Promise {
@ -174,7 +132,7 @@ class DefaultPool implements Pool {
*/
public function shutdown(): Promise {
if (!$this->isRunning()) {
throw new StatusError('The pool is not running.');
throw new StatusError("The pool was shutdown");
}
$this->running = false;
@ -207,8 +165,6 @@ class DefaultPool implements Pool {
*/
private function createWorker() {
$worker = $this->factory->create();
$worker->start();
$this->workers->attach($worker, 0);
return $worker;
}
@ -228,7 +184,7 @@ class DefaultPool implements Pool {
*/
protected function pull(): Worker {
if (!$this->isRunning()) {
throw new StatusError("The queue is not running");
throw new StatusError("The pool was shutdown");
}
do {
@ -239,6 +195,7 @@ class DefaultPool implements Pool {
} else {
// Max worker count has not been reached, so create another worker.
$worker = $this->createWorker();
break;
}
} else {
// Shift a worker off the idle queue.
@ -266,9 +223,7 @@ class DefaultPool implements Pool {
* @throws \Error If the worker was not part of this queue.
*/
protected function push(Worker $worker) {
if (!$this->workers->contains($worker)) {
throw new \Error("The provided worker was not part of this queue");
}
\assert($this->workers->contains($worker), "The provided worker was not part of this queue");
if (($this->workers[$worker] -= 1) === 0) {
// Worker is completely idle, remove from busy queue and add to idle queue.

View File

@ -44,13 +44,6 @@ class PooledWorker implements Worker {
return $this->worker->isIdle();
}
/**
* {@inheritdoc}
*/
public function start() {
$this->worker->start();
}
/**
* {@inheritdoc}
*/

View File

@ -6,9 +6,6 @@ namespace Amp\Parallel\Worker;
* An interface for worker pools.
*/
interface Pool extends Worker {
/** @var int The default minimum pool size. */
const DEFAULT_MIN_SIZE = 1;
/** @var int The default maximum pool size. */
const DEFAULT_MAX_SIZE = 32;
@ -36,13 +33,6 @@ interface Pool extends Worker {
*/
public function getIdleWorkerCount(): int;
/**
* Gets the minimum number of workers the pool may have idle.
*
* @return int The minimum number of workers.
*/
public function getMinSize(): int;
/**
* Gets the maximum number of workers the pool may spawn to handle concurrent tasks.
*

View File

@ -22,11 +22,6 @@ interface Worker {
*/
public function isIdle(): bool;
/**
* Starts the context execution.
*/
public function start();
/**
* Enqueues a task to be executed by the worker.
*

View File

@ -9,7 +9,7 @@ const LOOP_POOL_IDENTIFIER = Pool::class;
const LOOP_FACTORY_IDENTIFIER = WorkerFactory::class;
/**
* Gets or sets the global worker pool. The pool is started if it is not already running.
* Gets or sets the global worker pool.
*
* @param \Amp\Parallel\Worker\Pool|null $pool A worker pool instance.
*
@ -25,10 +25,6 @@ function pool(Pool $pool = null): Pool {
$pool = new DefaultPool;
}
if (!$pool->isRunning()) {
$pool->start();
}
Loop::setState(LOOP_POOL_IDENTIFIER, $pool);
return $pool;
}
@ -59,9 +55,7 @@ function get(): Worker {
* @return \Amp\Parallel\Worker\Worker
*/
function create(): Worker {
$worker = factory()->create();
$worker->start();
return $worker;
return factory()->create();
}
/**

View File

@ -13,14 +13,12 @@ abstract class AbstractPoolTest extends TestCase {
*
* @return \Amp\Parallel\Worker\Pool
*/
abstract protected function createPool($min = null, $max = null): Pool;
abstract protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool;
public function testIsRunning() {
Loop::run(function () {
$pool = $this->createPool();
$this->assertFalse($pool->isRunning());
$pool->start();
$this->assertTrue($pool->isRunning());
yield $pool->shutdown();
@ -31,7 +29,6 @@ abstract class AbstractPoolTest extends TestCase {
public function testIsIdleOnStart() {
Loop::run(function () {
$pool = $this->createPool();
$pool->start();
$this->assertTrue($pool->isIdle());
@ -39,33 +36,16 @@ abstract class AbstractPoolTest extends TestCase {
});
}
public function testGetMinSize() {
$pool = $this->createPool(7, 24);
$this->assertEquals(7, $pool->getMinSize());
}
public function testGetMaxSize() {
$pool = $this->createPool(3, 17);
$pool = $this->createPool(17);
$this->assertEquals(17, $pool->getMaxSize());
}
public function testMinWorkersSpawnedOnStart() {
Loop::run(function () {
$pool = $this->createPool(8, 32);
$pool->start();
$this->assertEquals(8, $pool->getWorkerCount());
yield $pool->shutdown();
});
}
public function testWorkersIdleOnStart() {
Loop::run(function () {
$pool = $this->createPool(8, 32);
$pool->start();
$pool = $this->createPool(32);
$this->assertEquals(8, $pool->getIdleWorkerCount());
$this->assertEquals(0, $pool->getIdleWorkerCount());
yield $pool->shutdown();
});
@ -74,7 +54,6 @@ abstract class AbstractPoolTest extends TestCase {
public function testEnqueue() {
Loop::run(function () {
$pool = $this->createPool();
$pool->start();
$returnValue = yield $pool->enqueue(new TestTask(42));
$this->assertEquals(42, $returnValue);
@ -86,7 +65,6 @@ abstract class AbstractPoolTest extends TestCase {
public function testEnqueueMultiple() {
Loop::run(function () {
$pool = $this->createPool();
$pool->start();
$values = yield \Amp\Promise\all([
$pool->enqueue(new TestTask(42)),
@ -102,7 +80,6 @@ abstract class AbstractPoolTest extends TestCase {
public function testKill() {
$pool = $this->createPool();
$pool->start();
$this->assertRunTimeLessThan([$pool, 'kill'], 1000);
$this->assertFalse($pool->isRunning());

View File

@ -23,7 +23,6 @@ abstract class AbstractWorkerTest extends TestCase {
public function testWorkerConstantDefined() {
Loop::run(function () {
$worker = $this->createWorker();
$worker->start();
$this->assertTrue(yield $worker->enqueue(new ConstantTask));
yield $worker->shutdown();
});
@ -34,7 +33,8 @@ abstract class AbstractWorkerTest extends TestCase {
$worker = $this->createWorker();
$this->assertFalse($worker->isRunning());
$worker->start();
$worker->enqueue(new TestTask(42)); // Enqueue a task to start the worker.
$this->assertTrue($worker->isRunning());
yield $worker->shutdown();
@ -45,7 +45,6 @@ abstract class AbstractWorkerTest extends TestCase {
public function testIsIdleOnStart() {
Loop::run(function () {
$worker = $this->createWorker();
$worker->start();
$this->assertTrue($worker->isIdle());
@ -56,7 +55,6 @@ abstract class AbstractWorkerTest extends TestCase {
public function testEnqueue() {
Loop::run(function () {
$worker = $this->createWorker();
$worker->start();
$returnValue = yield $worker->enqueue(new TestTask(42));
$this->assertEquals(42, $returnValue);
@ -68,7 +66,6 @@ abstract class AbstractWorkerTest extends TestCase {
public function testEnqueueMultipleSynchronous() {
Loop::run(function () {
$worker = $this->createWorker();
$worker->start();
$values = yield \Amp\Promise\all([
$worker->enqueue(new TestTask(42)),
@ -85,7 +82,6 @@ abstract class AbstractWorkerTest extends TestCase {
public function testEnqueueMultipleAsynchronous() {
Loop::run(function () {
$worker = $this->createWorker();
$worker->start();
$promises = [
$worker->enqueue(new TestTask(42, 200)),
@ -107,7 +103,6 @@ abstract class AbstractWorkerTest extends TestCase {
public function testNotIdleOnEnqueue() {
Loop::run(function () {
$worker = $this->createWorker();
$worker->start();
$coroutine = $worker->enqueue(new TestTask(42));
$this->assertFalse($worker->isIdle());
@ -119,7 +114,8 @@ abstract class AbstractWorkerTest extends TestCase {
public function testKill() {
$worker = $this->createWorker();
$worker->start();
$worker->enqueue(new TestTask(42));
$this->assertRunTimeLessThan([$worker, 'kill'], 250);
$this->assertFalse($worker->isRunning());
@ -128,7 +124,6 @@ abstract class AbstractWorkerTest extends TestCase {
public function testUnserializableTask() {
Loop::run(function () {
$worker = $this->createWorker();
$worker->start();
try {
yield $worker->enqueue(new NonAutoloadableTask);

View File

@ -11,12 +11,12 @@ use Amp\Parallel\Worker\WorkerProcess;
* @group process
*/
class ProcessPoolTest extends AbstractPoolTest {
protected function createPool($min = Pool::DEFAULT_MIN_SIZE, $max =Pool::DEFAULT_MAX_SIZE): Pool {
protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool {
$factory = $this->createMock(WorkerFactory::class);
$factory->method('create')->will($this->returnCallback(function () {
return new WorkerProcess;
}));
return new DefaultPool($min, $max, $factory);
return new DefaultPool($max, $factory);
}
}

View File

@ -12,12 +12,12 @@ use Amp\Parallel\Worker\WorkerThread;
* @requires extension pthreads
*/
class ThreadPoolTest extends AbstractPoolTest {
protected function createPool($min = Pool::DEFAULT_MIN_SIZE, $max = Pool::DEFAULT_MAX_SIZE): Pool {
protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool {
$factory = $this->createMock(WorkerFactory::class);
$factory->method('create')->will($this->returnCallback(function () {
return new WorkerThread;
}));
return new DefaultPool($min, $max, $factory);
return new DefaultPool($max, $factory);
}
}