process = new Process($command, $cwd, $env, $options); $this->stdoutEmitter = new Emitter; $this->stderrEmitter = new Emitter; $this->writeQueue = new \SplQueue; } /** * Resets process values. */ public function __clone() { $this->process = clone $this->process; $this->stdinWatcher = null; $this->stdoutWatcher = null; $this->stderrWatcher = null; $this->stdoutEmitter = new Emitter; $this->stderrEmitter = new Emitter; $this->writeQueue = new \SplQueue; $this->promise = null; } /** * {@inheritdoc} */ public function start() { $this->process->start(); $process = $this->process; $writes = $this->writeQueue; $this->stdinWatcher = Loop::onWritable($this->process->getStdin(), static function ($watcher, $resource) use ($process, $writes) { try { while (!$writes->isEmpty()) { /** @var \Amp\Deferred $deferred */ list($data, $previous, $deferred) = $writes->shift(); $length = \strlen($data); if ($length === 0) { $deferred->resolve(0); continue; } // Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full. $written = @\fwrite($resource, $data); if ($written === false || $written === 0) { $message = "Failed to write to STDIN"; if ($error = \error_get_last()) { $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); } $exception = new ProcessException($message); $deferred->fail($exception); while (!$writes->isEmpty()) { // Empty the write queue and fail all Deferreds. list(, , $deferred) = $writes->shift(); $deferred->fail($exception); } $process->kill(); return; } if ($length <= $written) { $deferred->resolve($written + $previous); continue; } $data = \substr($data, $written); $writes->unshift([$data, $written + $previous, $deferred]); return; } } finally { if ($writes->isEmpty()) { Loop::disable($watcher); } } }); Loop::disable($this->stdinWatcher); $callback = static function ($watcher, $resource, Emitter $emitter) { // Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes. if (@\feof($resource) || ($data = @\fread($resource, self::CHUNK_SIZE)) === false) { Loop::disable($watcher); return; } if ($data !== "") { $emitter->emit($data); } }; $this->stdoutWatcher = Loop::onReadable($this->process->getStdout(), $callback, $this->stdoutEmitter); $this->stderrWatcher = Loop::onReadable($this->process->getStderr(), $callback, $this->stderrEmitter); $this->promise = $this->process->join(); $this->promise->when(function (\Throwable $exception = null, int $code = null) { Loop::cancel($this->stdinWatcher); Loop::cancel($this->stdoutWatcher); Loop::cancel($this->stderrWatcher); if ($exception) { $this->stdoutEmitter->fail($exception); $this->stderrEmitter->fail($exception); return; } $this->stdoutEmitter->resolve($code); $this->stderrEmitter->resolve($code); }); } /** * {@inheritdoc} */ public function isRunning(): bool { return $this->process->isRunning(); } /** * @param string $data * * @return \AsyncInterop\Promise */ public function write(string $data): Promise { $length = \strlen($data); $written = 0; if ($this->writeQueue->isEmpty()) { if ($length === 0) { return new Success(0); } // Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full. $written = @\fwrite($this->process->getStdIn(), $data); if ($written === false) { $message = "Failed to write to stream"; if ($error = \error_get_last()) { $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); } return new Failure(new ProcessException($message)); } if ($length <= $written) { return new Success($written); } $data = \substr($data, $written); } $deferred = new Deferred; $this->writeQueue->push([$data, $written, $deferred]); Loop::enable($this->stdinWatcher); return $deferred->promise(); } public function getStdout(): Stream { return $this->stdoutEmitter->stream(); } public function getStderr(): Stream { return $this->stderrEmitter->stream(); } /** * {@inheritdoc} */ public function join(): Promise { if ($this->promise === null) { throw new StatusError("The process has not been started"); } return $this->promise; } /** * {@inheritdoc} */ public function kill() { $this->process->kill(); } /** * {@inheritdoc} */ public function getPid(): int { return $this->process->getPid(); } /** * {@inheritdoc} */ public function signal(int $signo) { $this->process->signal($signo); } }