1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Gracefully shutdown workers on process exit

This commit is contained in:
Aaron Piotrowski 2018-12-30 12:37:42 -06:00
parent 7a6e2aa8fd
commit d7697ebd69
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
3 changed files with 38 additions and 12 deletions

View File

@ -84,13 +84,6 @@ final class DefaultPool implements Pool
};
}
public function __destruct()
{
if ($this->isRunning()) {
$this->kill();
}
}
/**
* Checks if the pool is running.
*
@ -189,7 +182,10 @@ final class DefaultPool implements Pool
$this->running = false;
foreach ($this->workers as $worker) {
$worker->kill();
\assert($worker instanceof Worker);
if ($worker->isRunning()) {
$worker->kill();
}
}
}

View File

@ -53,7 +53,9 @@ class WorkerProcess implements Context
public function kill()
{
$this->process->kill();
if ($this->process->isRunning()) {
$this->process->kill();
}
}
public function join(): Promise

View File

@ -15,6 +15,8 @@ use function Amp\call;
*/
abstract class TaskWorker implements Worker
{
const SHUTDOWN_TIMEOUT = 1000;
/** @var \Amp\Parallel\Context\Context */
private $context;
@ -34,6 +36,22 @@ abstract class TaskWorker implements Worker
}
$this->context = $context;
$context = &$this->context;
\register_shutdown_function(static function () use (&$context) {
if ($context === null || !$context->isRunning()) {
return;
}
try {
Promise\wait(Promise\timeout(call(function () use ($context) {
yield $context->send(0);
return yield $context->join();
}), self::SHUTDOWN_TIMEOUT));
} catch (\Throwable $exception) {
$context->kill();
}
});
}
/**
@ -84,6 +102,7 @@ abstract class TaskWorker implements Worker
yield $this->context->send($job);
$result = yield $this->context->receive();
} catch (ChannelException $exception) {
$this->kill();
throw new WorkerException("Communicating with the worker failed", 0, $exception);
}
@ -118,7 +137,7 @@ abstract class TaskWorker implements Worker
return $this->exitStatus;
}
if (!$this->context->isRunning()) {
if ($this->context === null || !$this->context->isRunning()) {
return $this->exitStatus = new Success(0);
}
@ -129,7 +148,15 @@ abstract class TaskWorker implements Worker
}
yield $this->context->send(0);
return yield $this->context->join();
try {
return yield Promise\timeout($this->context->join(), self::SHUTDOWN_TIMEOUT);
} catch (\Throwable $exception) {
$this->context->kill();
throw new WorkerException("Failed to gracefully shutdown worker", 0, $exception);
} finally {
$this->context = null; // Null property to free memory because shutdown function has reference to context.
}
});
}
@ -138,7 +165,7 @@ abstract class TaskWorker implements Worker
*/
public function kill()
{
if ($this->exitStatus) {
if ($this->exitStatus || $this->context === null) {
return;
}
@ -149,5 +176,6 @@ abstract class TaskWorker implements Worker
}
$this->exitStatus = new Success(0);
$this->context = null; // Null property to free memory because shutdown function has reference to context.
}
}