input = $input; $this->on( 'close', /** @return void */ function () { Loop\removeReadStream($this->input); } ); Loop\addReadStream( $this->input, /** @return void */ function () { if (feof($this->input)) { // If stream_select reported a status change for this stream, // but the stream is EOF, it means it was closed. $this->emitClose(); return; } if (!$this->is_accepting_new_requests) { // If we fork, don't read any bytes in the input buffer from the worker process. $this->emitClose(); return; } $emitted_messages = $this->readMessages(); if ($emitted_messages > 0) { $this->emit('readMessageGroup'); } } ); } /** * @return int */ private function readMessages() : int { $emitted_messages = 0; while (($c = fgetc($this->input)) !== false && $c !== '') { $this->buffer .= $c; switch ($this->parsing_mode) { case self::PARSE_HEADERS: if ($this->buffer === "\r\n") { $this->parsing_mode = self::PARSE_BODY; $this->content_length = (int)$this->headers['Content-Length']; $this->buffer = ''; } elseif (substr($this->buffer, -2) === "\r\n") { $parts = explode(':', $this->buffer); $this->headers[$parts[0]] = trim($parts[1]); $this->buffer = ''; } break; case self::PARSE_BODY: if (strlen($this->buffer) === $this->content_length) { if (!$this->is_accepting_new_requests) { // If we fork, don't read any bytes in the input buffer from the worker process. $this->emitClose(); return $emitted_messages; } // MessageBody::parse can throw an Error, maybe log an error? try { $msg = new Message(MessageBody::parse($this->buffer), $this->headers); } catch (Exception $_) { $msg = null; } if ($msg) { $emitted_messages++; $this->emit('message', [$msg]); /** @psalm-suppress DocblockTypeContradiction */ if (!$this->is_accepting_new_requests) { // If we fork, don't read any bytes in the input buffer from the worker process. $this->emitClose(); return $emitted_messages; } } $this->parsing_mode = self::PARSE_HEADERS; $this->headers = []; $this->buffer = ''; } break; } } return $emitted_messages; } /** * @return void */ private function emitClose() { if ($this->did_emit_close) { return; } $this->did_emit_close = true; $this->emit('close'); } }