1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

No longer call kill() in destructors

This means forks/processes/threads must be properly shutdown or killed before a reference is lost or the thread will continue to run indefinitely. This was necessary because forks were automatically killing other processes or threads due to calling kill() in the destructor.
This commit is contained in:
Aaron Piotrowski 2015-09-14 18:59:33 -05:00
parent 41ec732092
commit 7def2ae5a3
5 changed files with 49 additions and 39 deletions

View File

@ -68,11 +68,6 @@ class Fork implements ContextInterface
$this->channel = null;
}
public function __destruct()
{
$this->kill(); // Will only terminate if the process is still running.
}
/**
* Checks if the context is running.
*

View File

@ -84,26 +84,6 @@ class Process
$this->options = $options;
}
/**
* Stops the process if it is still running.
*/
public function __destruct()
{
$this->kill(); // Will only terminate if the process is still running.
if (null !== $this->stdin) {
$this->stdin->close();
}
if (null !== $this->stdout) {
$this->stdout->close();
}
if (null !== $this->stderr) {
$this->stderr->close();
}
}
/**
* Resets process values.
*/

View File

@ -156,16 +156,6 @@ class Thread implements ContextInterface
}
}
/**
* Kills the thread if it is still running.
*
* @throws \Icicle\Concurrent\Exception\ThreadException
*/
public function __destruct()
{
$this->kill();
}
/**
* Closes channel and socket if still open.
*/

View File

@ -20,7 +20,7 @@ class Pool implements PoolInterface
/**
* @var int The default minimum pool size.
*/
const DEFAULT_MIN_SIZE = 8;
const DEFAULT_MIN_SIZE = 4;
/**
* @var int The default maximum pool size.
@ -159,6 +159,10 @@ class Pool implements PoolInterface
*/
public function start()
{
if ($this->isRunning()) {
throw new StatusError('The worker pool has already been started.');
}
// Start up the pool with the minimum number of workers.
$count = $this->minSize;
while (--$count >= 0) {
@ -179,6 +183,9 @@ class Pool implements PoolInterface
* @return \Generator
*
* @resolve mixed The return value of the task.
*
* @throws \Icicle\Concurrent\Exception\StatusError If the pool has not been started.
* @throws \Icicle\Concurrent\Exception\TaskException If the task throws an exception.
*/
public function enqueue(TaskInterface $task)
{
@ -189,9 +196,15 @@ class Pool implements PoolInterface
/** @var \Icicle\Concurrent\Worker\Worker $worker */
$worker = (yield $this->getWorker());
$result = (yield $worker->enqueue($task));
$this->pushWorker($worker);
try {
$result = (yield $worker->enqueue($task));
} finally {
if ($worker->isRunning()) {
$this->pushWorker($worker);
} else { // Worker crashed, spin up a new worker.
$this->pushWorker($this->createWorker());
}
}
yield $result;
}
@ -202,6 +215,8 @@ class Pool implements PoolInterface
* @coroutine
*
* @return \Generator
*
* @throws \Icicle\Concurrent\Exception\StatusError If the pool has not been started.
*/
public function shutdown()
{

30
tests/Worker/PoolTest.php Normal file
View File

@ -0,0 +1,30 @@
<?php
namespace Icicle\Tests\Concurrent\Worker;
use Icicle\Concurrent\Worker\Pool;
use Icicle\Coroutine;
use Icicle\Loop;
use Icicle\Tests\Concurrent\TestCase;
class PoolTest extends TestCase
{
public function createPool($min = 8, $max = 32)
{
return new Pool($min, $max);
}
public function testEnqueue()
{
Coroutine\create(function () {
$pool = $this->createPool();
$pool->start();
$returnValue = (yield $pool->enqueue(new TestTask(42)));
$this->assertEquals(42, $returnValue);
yield $pool->shutdown();
})->done();
Loop\run();
}
}