diff --git a/lib/Worker/DefaultPool.php b/lib/Worker/DefaultPool.php index 1b218d0..f8c2862 100644 --- a/lib/Worker/DefaultPool.php +++ b/lib/Worker/DefaultPool.php @@ -84,13 +84,6 @@ final class DefaultPool implements Pool }; } - public function __destruct() - { - if ($this->isRunning()) { - $this->kill(); - } - } - /** * Checks if the pool is running. * @@ -189,7 +182,10 @@ final class DefaultPool implements Pool $this->running = false; foreach ($this->workers as $worker) { - $worker->kill(); + \assert($worker instanceof Worker); + if ($worker->isRunning()) { + $worker->kill(); + } } } diff --git a/lib/Worker/Internal/WorkerProcess.php b/lib/Worker/Internal/WorkerProcess.php index 39184e4..19a28a3 100644 --- a/lib/Worker/Internal/WorkerProcess.php +++ b/lib/Worker/Internal/WorkerProcess.php @@ -53,7 +53,9 @@ class WorkerProcess implements Context public function kill() { - $this->process->kill(); + if ($this->process->isRunning()) { + $this->process->kill(); + } } public function join(): Promise diff --git a/lib/Worker/TaskWorker.php b/lib/Worker/TaskWorker.php index 58614e1..5ac6824 100644 --- a/lib/Worker/TaskWorker.php +++ b/lib/Worker/TaskWorker.php @@ -15,6 +15,8 @@ use function Amp\call; */ abstract class TaskWorker implements Worker { + const SHUTDOWN_TIMEOUT = 1000; + /** @var \Amp\Parallel\Context\Context */ private $context; @@ -34,6 +36,22 @@ abstract class TaskWorker implements Worker } $this->context = $context; + + $context = &$this->context; + \register_shutdown_function(static function () use (&$context) { + if ($context === null || !$context->isRunning()) { + return; + } + + try { + Promise\wait(Promise\timeout(call(function () use ($context) { + yield $context->send(0); + return yield $context->join(); + }), self::SHUTDOWN_TIMEOUT)); + } catch (\Throwable $exception) { + $context->kill(); + } + }); } /** @@ -84,6 +102,7 @@ abstract class TaskWorker implements Worker yield $this->context->send($job); $result = yield $this->context->receive(); } catch (ChannelException $exception) { + $this->kill(); throw new WorkerException("Communicating with the worker failed", 0, $exception); } @@ -118,7 +137,7 @@ abstract class TaskWorker implements Worker return $this->exitStatus; } - if (!$this->context->isRunning()) { + if ($this->context === null || !$this->context->isRunning()) { return $this->exitStatus = new Success(0); } @@ -129,7 +148,15 @@ abstract class TaskWorker implements Worker } yield $this->context->send(0); - return yield $this->context->join(); + + try { + return yield Promise\timeout($this->context->join(), self::SHUTDOWN_TIMEOUT); + } catch (\Throwable $exception) { + $this->context->kill(); + throw new WorkerException("Failed to gracefully shutdown worker", 0, $exception); + } finally { + $this->context = null; // Null property to free memory because shutdown function has reference to context. + } }); } @@ -138,7 +165,7 @@ abstract class TaskWorker implements Worker */ public function kill() { - if ($this->exitStatus) { + if ($this->exitStatus || $this->context === null) { return; } @@ -149,5 +176,6 @@ abstract class TaskWorker implements Worker } $this->exitStatus = new Success(0); + $this->context = null; // Null property to free memory because shutdown function has reference to context. } }