From e4e7f8b124919d0a8c1433f1736dd55eae218840 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sun, 19 Jul 2020 16:49:18 +0200 Subject: [PATCH] Implement handshake --- lib/IpcServer.php | 42 +++++++++++++++++++---- lib/Signaling/CloseAck.php | 7 ++++ lib/Signaling/CloseReq.php | 7 ++++ lib/Signaling/Inband.php | 7 ++++ lib/Signaling/Init.php | 57 +++++++++++++++++++++++++++++++ lib/Signaling/InitAck.php | 60 +++++++++++++++++++++++++++++++++ lib/Stream/Msg.php | 42 +++++++++++++++++++++++ lib/Sync/ChannelledSocket.php | 32 ++++++++++++++---- lib/Sync/Signaling/CloseAck.php | 7 ---- lib/Sync/Signaling/CloseMsg.php | 7 ---- lib/Sync/Signaling/CloseReq.php | 7 ---- lib/Sync/Signaling/Init.php | 31 ----------------- lib/functions.php | 45 ++++++++++++++++++++++--- 13 files changed, 281 insertions(+), 70 deletions(-) create mode 100644 lib/Signaling/CloseAck.php create mode 100644 lib/Signaling/CloseReq.php create mode 100644 lib/Signaling/Inband.php create mode 100644 lib/Signaling/Init.php create mode 100644 lib/Signaling/InitAck.php create mode 100644 lib/Stream/Msg.php delete mode 100644 lib/Sync/Signaling/CloseAck.php delete mode 100644 lib/Sync/Signaling/CloseMsg.php delete mode 100644 lib/Sync/Signaling/CloseReq.php delete mode 100644 lib/Sync/Signaling/Init.php diff --git a/lib/IpcServer.php b/lib/IpcServer.php index d774d80..e5e074d 100644 --- a/lib/IpcServer.php +++ b/lib/IpcServer.php @@ -3,13 +3,20 @@ namespace Amp\Ipc; use Amp\Deferred; +use Amp\Ipc\Signaling\Init; +use Amp\Ipc\Signaling\InitAck; use Amp\Ipc\Sync\ChannelledSocket; use Amp\Loop; use Amp\Promise; use Amp\Success; +use function Amp\asyncCall; + class IpcServer { + /** @var int Server version */ + const VERSION = 1; + /** @var resource|null */ private $server; @@ -26,7 +33,7 @@ class IpcServer * @param string $uri Local endpoint on which to listen for requests * @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes) */ - public function __construct(string $uri = '', bool $useFIFO = false) + public function __construct(string $uri = '', string $pwd = '', bool $useFIFO = false) { if (!$uri) { $suffix = \bin2hex(\random_bytes(10)); @@ -75,7 +82,7 @@ class IpcServer } $acceptor = &$this->acceptor; - $this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor, $fifo): void { + $this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor, $fifo, $pwd): void { if ($fifo) { $length = \unpack('v', \fread($server, 2))[1]; if (!$length) { @@ -104,13 +111,13 @@ class IpcServer return; // Could not open fifo } } - $channel = new ChannelledSocket(...$sockets); + $channel = new ChannelledSocket($pwd, ...$sockets); } else { // Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure. if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking. return; // Accepting client failed. } - $channel = new ChannelledSocket($client, $client); + $channel = new ChannelledSocket($pwd, $client, $client); } $deferred = $acceptor; @@ -118,7 +125,30 @@ class IpcServer \assert($deferred !== null); - $deferred->resolve($channel); + $channel->receive()->onResolve(static function ($result, $error) use ($channel, $deferred, $pwd): \Generator { + if ($error) { + throw $error; + } + if ($result instanceof Init) { + if ($result->getPassword() !== $pwd) { + yield $channel->send(InitAck::wrongPassword()); + yield $channel->disconnect(); + } else if ($result->getVersion() !== self::VERSION) { + yield $channel->send(InitAck::wrongPassword()); + yield $channel->disconnect(); + } else { + yield $channel->send(InitAck::ok()); + $deferred->resolve($channel); + } + } else { + if ($pwd === '') { + $deferred->resolve($channel); + } else { + yield $channel->disconnect(); + } + } + $channel = null; + }); if (!$acceptor) { Loop::disable($watcher); @@ -129,7 +159,7 @@ class IpcServer } /** - * Destructor function + * Destructor function. */ public function __destruct() { diff --git a/lib/Signaling/CloseAck.php b/lib/Signaling/CloseAck.php new file mode 100644 index 0000000..9b902ee --- /dev/null +++ b/lib/Signaling/CloseAck.php @@ -0,0 +1,7 @@ +password = $password; + $this->type = $type; + $this->version = $version; + } + /** + * Get init password + * + * @return string + */ + public function getPassword(): string + { + return $this->password; + } + /** + * Get channel type + * + * @return integer + */ + public function getType(): int + { + return $this->type; + } + /** + * Get channel version + * + * @return integer + */ + public function getVersion(): int + { + return $this->version; + } +} diff --git a/lib/Signaling/InitAck.php b/lib/Signaling/InitAck.php new file mode 100644 index 0000000..f6121c4 --- /dev/null +++ b/lib/Signaling/InitAck.php @@ -0,0 +1,60 @@ +status = $status; + } + /** + * Get init status + * + * @return integer + */ + public function getStatus(): int + { + return $this->status; + } + + /** + * OK + * + * @return self + */ + public static function ok(): self + { + return new self(self::STATUS_OK); + } + /** + * Wrong password + * + * @return self + */ + public static function wrongPassword(): self + { + return new self(self::STATUS_WRONG_PASSWORD); + } + /** + * Wrong version + * + * @return self + */ + public static function wrongversion(): self + { + return new self(self::STATUS_WRONG_VERSION); + } +} \ No newline at end of file diff --git a/lib/Stream/Msg.php b/lib/Stream/Msg.php new file mode 100644 index 0000000..a1a5705 --- /dev/null +++ b/lib/Stream/Msg.php @@ -0,0 +1,42 @@ +streamId = $streamId; + $this->payload = $payload; + } + /** + * Get stream ID. + * + * @return int + */ + public function getStreamId(): int + { + return $this->streamId; + } + /** + * Get message payload. + * + * @return mixed + */ + public function getPayload() + { + return $this->payload; + } +} diff --git a/lib/Sync/ChannelledSocket.php b/lib/Sync/ChannelledSocket.php index 9291bf0..81478ac 100644 --- a/lib/Sync/ChannelledSocket.php +++ b/lib/Sync/ChannelledSocket.php @@ -5,8 +5,10 @@ namespace Amp\Ipc\Sync; use Amp\ByteStream\ResourceInputStream; use Amp\ByteStream\ResourceOutputStream; use Amp\Deferred; -use Amp\Ipc\Sync\Signaling\CloseAck; -use Amp\Ipc\Sync\Signaling\CloseReq; +use Amp\Ipc\Signaling\CloseAck; +use Amp\Ipc\Signaling\CloseReq; +use Amp\Ipc\Signaling\Inband; +use Amp\Ipc\Signaling\StreamMsg; use Amp\Promise; use Amp\Success; @@ -39,14 +41,19 @@ final class ChannelledSocket implements Channel /** @var bool */ private $reading = false; + /** @var string Server password */ + private $password = ''; + /** + * @param string $pwd Server password. * @param resource $read Readable stream resource. * @param resource $write Writable stream resource. * * @throws \Error If a stream resource is not given for $resource. */ - public function __construct($read, $write) + public function __construct($pwd, $read, $write) { + $this->password = $pwd; $this->channel = new ChannelledStream( $this->read = new ResourceInputStream($read), $this->write = new ResourceOutputStream($write) @@ -66,19 +73,19 @@ final class ChannelledSocket implements Channel $data = yield $this->channel->receive(); $this->reading = false; + if (!$data instanceof Inband) { + return $data; + } + if ($data instanceof CloseReq) { yield $this->channel->send(new CloseAck); $this->state = self::GOT_FIN_MASK; yield $this->disconnect(); - return null; } elseif ($data instanceof CloseAck) { $closePromise = $this->closePromise; $this->closePromise = null; $closePromise->resolve($data); - return null; } - - return $data; }); } @@ -126,6 +133,17 @@ final class ChannelledSocket implements Channel return $this->channel->send($data); } + /** + * Wrap stream for usage over IPC channel + * + * @param InputStream|OutputStream|mixed $stream Stream + * + * @return inpu + */ + public function wrap($stream): Promise + { + + } /** * {@inheritdoc} */ diff --git a/lib/Sync/Signaling/CloseAck.php b/lib/Sync/Signaling/CloseAck.php deleted file mode 100644 index e718789..0000000 --- a/lib/Sync/Signaling/CloseAck.php +++ /dev/null @@ -1,7 +0,0 @@ -streamId =$streamId; - } - /** - * Get strema ID. - * - * @return string - */ - public function getStreamId(): string - { - return $this->streamId; - } -} diff --git a/lib/functions.php b/lib/functions.php index 97a234a..6776195 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -2,6 +2,8 @@ namespace Amp\Ipc; +use Amp\Ipc\Signaling\Init; +use Amp\Ipc\Signaling\InitAck; use Amp\Ipc\Sync\ChannelledSocket; use Amp\Promise; @@ -11,23 +13,40 @@ use function Amp\call; * Create IPC server. * * @param string $uri Local endpoint on which to listen for requests + * @param string $pwd Optional authentication password * * @return IpcServer */ -function listen(string $uri): IpcServer +function listen(string $uri, string $pwd = ''): IpcServer { - return new IpcServer($uri); + return new IpcServer($uri, $pwd); } /** * Connect to IPC server. * * @param string $uri URI + * @param string $pwd Optional authentication password * * @return Promise */ -function connect(string $uri): Promise +function connect(string $uri, string $pwd = ''): Promise { - return call(static function () use ($uri) { + return connectInternal($uri, $pwd, Init::MAIN); +} +/** + * Connect to IPC server (internal function, don't use). + * + * @param string $uri URI + * @param string $pwd Optional authentication password + * @param int $type Socket type + * + * @internal Internal method + * + * @return Promise + */ +function connectInternal(string $uri, string $pwd, int $type): Promise +{ + return call(static function () use ($uri, $pwd, $type) { if (!\file_exists($uri)) { throw new \RuntimeException("The endpoint does not exist!"); } @@ -87,6 +106,22 @@ function connect(string $uri): Promise \fclose($tempSocket); $tempSocket = null; - return new ChannelledSocket(...$sockets); + $channel = new ChannelledSocket(...$sockets); + yield $channel->send(new Init($pwd, $type, IpcServer::VERSION)); + $ack = yield $channel->receive(); + if (!$ack instanceof InitAck) { + throw new \RuntimeException("Received invalid init ACK!"); + } + if ($ack->getStatus() === InitAck::STATUS_OK) { + return $channel; + } + yield $channel->disconnect(); + $status = $ack->getStatus(); + if ($status === InitAck::STATUS_WRONG_PASSWORD) { + throw new \RuntimeException("Wrong IPC server password!"); + } elseif ($status === InitAck::STATUS_WRONG_VERSION) { + throw new \RuntimeException("Wrong IPC server version, please upgrade client or server!"); + } + throw new \RuntimeException("Invalid IPC InitAck status: $status"); }); }