mirror of
https://github.com/danog/parallel.git
synced 2025-01-22 22:11:11 +01:00
Only check for exit when receiving, fixes #9
This commit is contained in:
parent
84d405db3f
commit
a31f3491ca
@ -143,19 +143,21 @@ class Thread implements Strand {
|
||||
|
||||
list($channel, $this->socket) = $sockets;
|
||||
|
||||
$this->thread = $thread = new Internal\Thread($this->socket, $this->function, $this->args);
|
||||
$this->thread = new Internal\Thread($this->socket, $this->function, $this->args);
|
||||
|
||||
if (!$this->thread->start(PTHREADS_INHERIT_INI)) {
|
||||
throw new ContextException('Failed to start the thread.');
|
||||
}
|
||||
|
||||
$this->channel = $channel = new ChannelledSocket($channel, $channel);
|
||||
$this->channel = new ChannelledSocket($channel, $channel);
|
||||
|
||||
$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () use ($thread, $channel) {
|
||||
if (!$thread->isRunning()) {
|
||||
$channel->close();
|
||||
$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, function () {
|
||||
if (!$this->thread->isRunning()) {
|
||||
$this->channel->close();
|
||||
}
|
||||
});
|
||||
|
||||
Loop::disable($this->watcher);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -200,6 +202,7 @@ class Thread implements Strand {
|
||||
*
|
||||
* @throws StatusError Thrown if the context has not been started.
|
||||
* @throws SynchronizationError Thrown if an exit status object is not received.
|
||||
* @throws ContextException If the context stops responding.
|
||||
*/
|
||||
public function join(): Promise {
|
||||
if ($this->channel == null || $this->thread === null) {
|
||||
@ -214,9 +217,12 @@ class Thread implements Strand {
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @throws \Amp\Parallel\SynchronizationError If the thread does not send an exit status.
|
||||
* @throws SynchronizationError If the thread does not send an exit status.
|
||||
* @throws ContextException If the context stops responding.
|
||||
*/
|
||||
private function doJoin(): \Generator {
|
||||
Loop::enable($this->watcher);
|
||||
|
||||
try {
|
||||
$response = yield $this->channel->receive();
|
||||
|
||||
@ -232,6 +238,7 @@ class Thread implements Strand {
|
||||
$this->kill();
|
||||
throw $exception;
|
||||
} finally {
|
||||
Loop::disable($this->watcher);
|
||||
$this->close();
|
||||
}
|
||||
|
||||
@ -250,12 +257,16 @@ class Thread implements Strand {
|
||||
}
|
||||
|
||||
private function doReceive() {
|
||||
Loop::enable($this->watcher);
|
||||
|
||||
try {
|
||||
$data = yield $this->channel->receive();
|
||||
} catch (ChannelException $exception) {
|
||||
throw new ContextException(
|
||||
"The context stopped responding, potentially due to a fatal error or calling exit", 0, $exception
|
||||
);
|
||||
} finally {
|
||||
Loop::disable($this->watcher);
|
||||
}
|
||||
|
||||
if ($data instanceof ExitResult) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user