server = \stream_socket_server(self::SERVER_SOCKET_URI, $errNo, $errStr, $flags); if (!$this->server) { throw new \Error("Failed to create TCP server socket for process wrapper: {$errNo}: {$errStr}"); } if (!\stream_set_blocking($this->server, false)) { throw new \Error("Failed to set server socket to non-blocking mode"); } list($this->address, $this->port) = \explode(':', \stream_socket_get_name($this->server, false)); $this->port = (int) $this->port; Loop::unreference(Loop::onReadable($this->server, [$this, 'onServerSocketReadable'])); } private function failClientHandshake($socket, int $code): void { \fwrite($socket, \chr(SignalCode::HANDSHAKE_ACK) . \chr($code)); \fclose($socket); unset($this->pendingClients[(int) $socket]); } private function failHandleStart(Handle $handle, string $message, ...$args) { Loop::cancel($handle->connectTimeoutWatcher); unset($this->pendingProcesses[$handle->wrapperPid]); foreach ($handle->sockets as $socket) { \fclose($socket); } $error = new ProcessException(\vsprintf($message, $args)); foreach ($handle->stdioDeferreds as $deferred) { $deferred->fail($error); } } /** * Read data from a client socket. * * This method cleans up internal state as appropriate. Returns null if the read fails or needs to be repeated. * * @param resource $socket * @param int $length * @param PendingSocketClient $state * * @return string|null */ private function readDataFromPendingClient($socket, int $length, PendingSocketClient $state) { $data = \fread($socket, $length); if ($data === false || $data === '') { return null; } $data = $state->receivedDataBuffer . $data; if (\strlen($data) < $length) { $state->receivedDataBuffer = $data; return null; } $state->receivedDataBuffer = ''; Loop::cancel($state->readWatcher); Loop::cancel($state->timeoutWatcher); return $data; } public function onReadableHandshake($watcher, $socket) { $socketId = (int) $socket; $pendingClient = $this->pendingClients[$socketId]; if (null === $data = $this->readDataFromPendingClient($socket, self::SECURITY_TOKEN_SIZE + 6, $pendingClient)) { return; } $packet = \unpack('Csignal/Npid/Cstream_id/a*client_token', $data); // validate the client's handshake if ($packet['signal'] !== SignalCode::HANDSHAKE) { $this->failClientHandshake($socket, HandshakeStatus::SIGNAL_UNEXPECTED); return; } if ($packet['stream_id'] > 2) { $this->failClientHandshake($socket, HandshakeStatus::INVALID_STREAM_ID); return; } if (!isset($this->pendingProcesses[$packet['pid']])) { $this->failClientHandshake($socket, HandshakeStatus::INVALID_PROCESS_ID); return; } $handle = $this->pendingProcesses[$packet['pid']]; if (isset($handle->sockets[$packet['stream_id']])) { $this->failClientHandshake($socket, HandshakeStatus::DUPLICATE_STREAM_ID); \trigger_error(\sprintf( "%s: Received duplicate socket for process #%s stream #%d", self::class, $handle->pid, $packet['stream_id'] ), E_USER_WARNING); return; } if (!\hash_equals($packet['client_token'], $handle->securityTokens[$packet['stream_id']])) { $this->failClientHandshake($socket, HandshakeStatus::INVALID_CLIENT_TOKEN); $this->failHandleStart($handle, "Invalid client security token for stream #%d", $packet['stream_id']); return; } $ackData = \chr(SignalCode::HANDSHAKE_ACK) . \chr(HandshakeStatus::SUCCESS) . $handle->securityTokens[$packet['stream_id'] + 3]; // Unless we set the security token size so high that it won't fit in the // buffer, this probably shouldn't ever happen unless something has gone wrong if (\fwrite($socket, $ackData) !== self::SECURITY_TOKEN_SIZE + 2) { unset($this->pendingClients[$socketId]); return; } $pendingClient->pid = $packet['pid']; $pendingClient->streamId = $packet['stream_id']; $pendingClient->readWatcher = Loop::onReadable($socket, [$this, 'onReadableHandshakeAck']); } public function onReadableHandshakeAck($watcher, $socket) { $socketId = (int) $socket; $pendingClient = $this->pendingClients[$socketId]; // can happen if the start promise was failed if (!isset($this->pendingProcesses[$pendingClient->pid])) { \fclose($socket); Loop::cancel($watcher); Loop::cancel($pendingClient->timeoutWatcher); unset($this->pendingClients[$socketId]); return; } if (null === $data = $this->readDataFromPendingClient($socket, 2, $pendingClient)) { return; } unset($this->pendingClients[$socketId]); $handle = $this->pendingProcesses[$pendingClient->pid]; $packet = \unpack('Csignal/Cstatus', $data); if ($packet['signal'] !== SignalCode::HANDSHAKE_ACK || $packet['status'] !== HandshakeStatus::SUCCESS) { $this->failHandleStart( $handle, "Client rejected handshake with code %d for stream #%d", $packet['status'], $pendingClient->streamId ); return; } $handle->sockets[$pendingClient->streamId] = $socket; if (count($handle->sockets) === 3) { $pendingClient->readWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadableChildPid'], $handle); } } public function onReadableChildPid($watcher, $socket, Handle $handle) { Loop::cancel($watcher); Loop::cancel($handle->connectTimeoutWatcher); $data = \fread($socket, 5); if ($data === false || $data === '') { return; } if (\strlen($data) !== 5) { $this->failHandleStart( $handle, 'Failed to read PID from wrapper: Received %d of 5 expected bytes', \strlen($data) ); return; } $packet = \unpack('Csignal/Npid', $data); if ($packet['signal'] !== SignalCode::CHILD_PID) { $this->failHandleStart( $handle, "Failed to read PID from wrapper: Unexpected signal code %d", $packet['signal'] ); return; } $handle->status = ProcessStatus::RUNNING; $handle->pidDeferred->resolve($packet['pid']); $handle->stdioDeferreds[0]->resolve(new ResourceOutputStream($handle->sockets[0])); $handle->stdioDeferreds[1]->resolve(new ResourceInputStream($handle->sockets[1])); $handle->stdioDeferreds[2]->resolve(new ResourceInputStream($handle->sockets[2])); $handle->exitCodeWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadableExitCode'], $handle); if (!$handle->exitCodeRequested) { Loop::unreference($handle->exitCodeWatcher); } unset($this->pendingProcesses[$handle->wrapperPid]); } public function onReadableExitCode($watcher, $socket, Handle $handle) { $handle->exitCodeWatcher = null; Loop::cancel($watcher); $data = \fread($socket, 5); if ($data === false || $data === '') { return; } if (\strlen($data) !== 5) { $handle->status = ProcessStatus::ENDED; $handle->joinDeferred->fail(new ProcessException( \sprintf('Failed to read exit code from wrapper: Received %d of 5 expected bytes', \strlen($data)) )); return; } $packet = \unpack('Csignal/Ncode', $data); if ($packet['signal'] !== SignalCode::EXIT_CODE) { $this->failHandleStart( $handle, "Failed to read exit code from wrapper: Unexpected signal code %d", $packet['signal'] ); return; } $handle->status = ProcessStatus::ENDED; $handle->joinDeferred->resolve($packet['code']); } public function onClientSocketConnectTimeout($watcher, $socket) { $id = (int) $socket; Loop::cancel($this->pendingClients[$id]->readWatcher); unset($this->pendingClients[$id]); \fclose($socket); } public function onServerSocketReadable() { $socket = \stream_socket_accept($this->server); if (!\stream_set_blocking($socket, false)) { throw new \Error("Failed to set client socket to non-blocking mode"); } $pendingClient = new PendingSocketClient; $pendingClient->readWatcher = Loop::onReadable($socket, [$this, 'onReadableHandshake']); $pendingClient->timeoutWatcher = Loop::delay(self::CONNECT_TIMEOUT, [$this, 'onClientSocketConnectTimeout'], $socket); $this->pendingClients[(int) $socket] = $pendingClient; } public function onProcessConnectTimeout($watcher, Handle $handle) { $status = \proc_get_status($handle->proc); $error = null; if (!$status['running']) { $error = \stream_get_contents($handle->wrapperStderrPipe); } $error = $error ?: 'Process did not connect to server before timeout elapsed'; \fclose($handle->wrapperStderrPipe); \proc_close($handle->proc); foreach ($handle->sockets as $socket) { \fclose($socket); } $error = new ProcessException(\trim($error)); foreach ($handle->stdioDeferreds as $deferred) { $deferred->fail($error); } } public function registerPendingProcess(Handle $handle) { $handle->connectTimeoutWatcher = Loop::delay(self::CONNECT_TIMEOUT, [$this, 'onProcessConnectTimeout'], $handle); $this->pendingProcesses[$handle->wrapperPid] = $handle; } }