mirror of
https://github.com/danog/parallel.git
synced 2024-11-30 04:39:01 +01:00
Remove reference to process object from watcher
This commit is contained in:
parent
8126b8e3f2
commit
fdb4a81600
@ -18,8 +18,8 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
* @param string $cwd Working directory.
|
||||
* @param mixed[] $env Array of environment variables.
|
||||
*/
|
||||
public function __construct(string $path, string $cwd = '', array $env = []) {
|
||||
$command = \PHP_BINARY . ' ' . $path;
|
||||
public function __construct(string $path, string $cwd = "", array $env = []) {
|
||||
$command = \PHP_BINARY . " " . $path;
|
||||
$this->process = new Process($command, $cwd, $env);
|
||||
}
|
||||
|
||||
@ -51,14 +51,14 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
*/
|
||||
public function receive(): Awaitable {
|
||||
if ($this->channel === null) {
|
||||
throw new StatusError('The process has not been started.');
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return \Amp\pipe($this->channel->receive(), static function ($data) {
|
||||
if ($data instanceof ExitStatus) {
|
||||
$data = $data->getResult();
|
||||
throw new SynchronizationError(\sprintf(
|
||||
'Thread unexpectedly exited with result of type: %s',
|
||||
"Thread unexpectedly exited with result of type: %s",
|
||||
\is_object($data) ? \get_class($data) : \gettype($data)
|
||||
));
|
||||
}
|
||||
@ -72,11 +72,11 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
*/
|
||||
public function send($data): Awaitable {
|
||||
if ($this->channel === null) {
|
||||
throw new StatusError('The process has not been started.');
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
if ($data instanceof ExitStatus) {
|
||||
throw new \Error('Cannot send exit status objects.');
|
||||
throw new \Error("Cannot send exit status objects");
|
||||
}
|
||||
|
||||
return $this->channel->send($data);
|
||||
|
@ -16,7 +16,7 @@ class Process implements ProcessContext {
|
||||
private $command;
|
||||
|
||||
/** @var string */
|
||||
private $cwd = '';
|
||||
private $cwd = "";
|
||||
|
||||
/** @var array */
|
||||
private $env = [];
|
||||
@ -52,10 +52,10 @@ class Process implements ProcessContext {
|
||||
* @param mixed[] $env Environment variables or use an empty array to inherit from the current PHP process.
|
||||
* @param mixed[] $options Options for proc_open().
|
||||
*/
|
||||
public function __construct(string $command, string $cwd = '', array $env = [], array $options = []) {
|
||||
public function __construct(string $command, string $cwd = null, array $env = [], array $options = []) {
|
||||
$this->command = $command;
|
||||
|
||||
if ($cwd !== '') {
|
||||
if ($cwd !== null) {
|
||||
$this->cwd = $cwd;
|
||||
}
|
||||
|
||||
@ -108,27 +108,31 @@ class Process implements ProcessContext {
|
||||
* @throws \Amp\Parallel\StatusError If the process is already running.
|
||||
*/
|
||||
public function start() {
|
||||
if (null !== $this->deferred) {
|
||||
throw new StatusError('The process has already been started.');
|
||||
if ($this->deferred !== null) {
|
||||
throw new StatusError("The process has already been started");
|
||||
}
|
||||
|
||||
$this->deferred = new Deferred;
|
||||
$this->deferred = $deferred = new Deferred;
|
||||
|
||||
$fd = [
|
||||
['pipe', 'r'], // stdin
|
||||
['pipe', 'w'], // stdout
|
||||
['pipe', 'a'], // stderr
|
||||
['pipe', 'w'], // exit code pipe
|
||||
["pipe", "r"], // stdin
|
||||
["pipe", "w"], // stdout
|
||||
["pipe", "a"], // stderr
|
||||
["pipe", "w"], // exit code pipe
|
||||
];
|
||||
|
||||
$nd = \strncasecmp(\PHP_OS, 'WIN', 3) === 0 ? 'NUL' : '/dev/null';
|
||||
$nd = \strncasecmp(\PHP_OS, "WIN", 3) === 0 ? "NUL" : "/dev/null";
|
||||
|
||||
$command = \sprintf('(%s) 3>%s; code=$?; echo $code >&3; exit $code', $this->command, $nd);
|
||||
|
||||
$this->process = \proc_open($command, $fd, $pipes, $this->cwd ?: null, $this->env ?: null, $this->options);
|
||||
$this->process = @\proc_open($command, $fd, $pipes, $this->cwd ?: null, $this->env ?: null, $this->options);
|
||||
|
||||
if (!\is_resource($this->process)) {
|
||||
throw new ContextException('Could not start process.');
|
||||
$message = "Could not start process";
|
||||
if ($error = \error_get_last()) {
|
||||
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
|
||||
}
|
||||
throw new ContextException($message);
|
||||
}
|
||||
|
||||
$this->oid = \getmypid();
|
||||
@ -137,59 +141,53 @@ class Process implements ProcessContext {
|
||||
if (!$status) {
|
||||
\proc_close($this->process);
|
||||
$this->process = null;
|
||||
throw new ContextException('Could not get process status.');
|
||||
throw new ContextException("Could not get process status");
|
||||
}
|
||||
|
||||
$this->pid = $status['pid'];
|
||||
$this->pid = $status["pid"];
|
||||
|
||||
$this->stdin = new Socket($pipes[0]);
|
||||
$this->stdin = $stdin = new Socket($pipes[0]);
|
||||
$this->stdout = new Socket($pipes[1]);
|
||||
$this->stderr = new Socket($pipes[2]);
|
||||
|
||||
$stream = $pipes[3];
|
||||
\stream_set_blocking($stream, false);
|
||||
|
||||
$this->watcher = Loop::onReadable($stream, function ($watcher, $resource) {
|
||||
if (!\is_resource($resource) || \feof($resource)) {
|
||||
$this->close($resource);
|
||||
$this->deferred->fail(new ContextException('Process ended unexpectedly.'));
|
||||
} else {
|
||||
$code = \fread($resource, 1);
|
||||
$this->close($resource);
|
||||
if (!\strlen($code) || !\is_numeric($code)) {
|
||||
$this->deferred->fail(new ContextException('Process ended without providing a status code.'));
|
||||
} else {
|
||||
$this->deferred->resolve((int) $code);
|
||||
$process = &$this->process;
|
||||
|
||||
$this->watcher = Loop::onReadable($stream, static function ($watcher, $resource) use (
|
||||
&$process, $deferred, $stdin
|
||||
) {
|
||||
try {
|
||||
try {
|
||||
if (!\is_resource($resource) || \feof($resource)) {
|
||||
throw new ContextException("Process ended unexpectedly");
|
||||
}
|
||||
$code = @\fread($resource, 1);
|
||||
if (!\strlen($code) || !\is_numeric($code)) {
|
||||
throw new ContextException("Process ended without providing a status code");
|
||||
}
|
||||
} finally {
|
||||
if (\is_resource($resource)) {
|
||||
\fclose($resource);
|
||||
}
|
||||
if (\is_resource($process)) {
|
||||
\proc_close($process);
|
||||
$process = null;
|
||||
}
|
||||
$stdin->close();
|
||||
Loop::cancel($watcher);
|
||||
}
|
||||
|
||||
$deferred->resolve((int) $code);
|
||||
} catch (\Throwable $exception) {
|
||||
$deferred->fail($exception);
|
||||
}
|
||||
});
|
||||
|
||||
Loop::disable($this->watcher);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the stream resource provided, the open process handle, and stdin.
|
||||
*
|
||||
* @param resource $resource
|
||||
*/
|
||||
private function close($resource) {
|
||||
if (\is_resource($resource)) {
|
||||
\fclose($resource);
|
||||
}
|
||||
|
||||
if (\is_resource($this->process)) {
|
||||
\proc_close($this->process);
|
||||
$this->process = null;
|
||||
}
|
||||
|
||||
$this->stdin->close();
|
||||
|
||||
if ($this->watcher !== null) {
|
||||
Loop::cancel($this->watcher);
|
||||
$this->watcher = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Interop\Async\Awaitable<int> Resolves with exit status.
|
||||
*
|
||||
@ -197,7 +195,7 @@ class Process implements ProcessContext {
|
||||
*/
|
||||
public function join(): Awaitable {
|
||||
if ($this->deferred === null) {
|
||||
throw new StatusError('The process has not been started.');
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
Loop::enable($this->watcher);
|
||||
@ -224,7 +222,8 @@ class Process implements ProcessContext {
|
||||
$this->process = null;
|
||||
|
||||
Loop::cancel($this->watcher);
|
||||
$this->watcher = null;
|
||||
|
||||
$this->deferred->fail(new ContextException("The process was killed"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -237,7 +236,7 @@ class Process implements ProcessContext {
|
||||
*/
|
||||
public function signal(int $signo) {
|
||||
if (!$this->isRunning()) {
|
||||
throw new StatusError('The process is not running.');
|
||||
throw new StatusError("The process is not running");
|
||||
}
|
||||
|
||||
\proc_terminate($this->process, (int) $signo);
|
||||
@ -268,8 +267,8 @@ class Process implements ProcessContext {
|
||||
* @return string The current working directory or null if inherited from the current PHP process.
|
||||
*/
|
||||
public function getWorkingDirectory(): string {
|
||||
if ($this->cwd === '') {
|
||||
return \getcwd() ?: '';
|
||||
if ($this->cwd === "") {
|
||||
return \getcwd() ?: "";
|
||||
}
|
||||
|
||||
return $this->cwd;
|
||||
@ -311,7 +310,7 @@ class Process implements ProcessContext {
|
||||
*/
|
||||
public function getStdIn(): Stream {
|
||||
if ($this->stdin === null) {
|
||||
throw new StatusError('The process has not been started.');
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return $this->stdin;
|
||||
@ -326,7 +325,7 @@ class Process implements ProcessContext {
|
||||
*/
|
||||
public function getStdOut(): Stream {
|
||||
if ($this->stdout === null) {
|
||||
throw new StatusError('The process has not been started.');
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return $this->stdout;
|
||||
@ -341,7 +340,7 @@ class Process implements ProcessContext {
|
||||
*/
|
||||
public function getStdErr(): Stream {
|
||||
if ($this->stderr === null) {
|
||||
throw new StatusError('The process has not been started.');
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return $this->stderr;
|
||||
|
Loading…
Reference in New Issue
Block a user