1
0
mirror of https://github.com/danog/ipc.git synced 2024-11-26 20:15:05 +01:00

Implement handshake

This commit is contained in:
Daniil Gentili 2020-07-19 16:49:18 +02:00
parent 8d8ea4cb79
commit e4e7f8b124
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
13 changed files with 281 additions and 70 deletions

View File

@ -3,13 +3,20 @@
namespace Amp\Ipc;
use Amp\Deferred;
use Amp\Ipc\Signaling\Init;
use Amp\Ipc\Signaling\InitAck;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use function Amp\asyncCall;
class IpcServer
{
/** @var int Server version */
const VERSION = 1;
/** @var resource|null */
private $server;
@ -26,7 +33,7 @@ class IpcServer
* @param string $uri Local endpoint on which to listen for requests
* @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes)
*/
public function __construct(string $uri = '', bool $useFIFO = false)
public function __construct(string $uri = '', string $pwd = '', bool $useFIFO = false)
{
if (!$uri) {
$suffix = \bin2hex(\random_bytes(10));
@ -75,7 +82,7 @@ class IpcServer
}
$acceptor = &$this->acceptor;
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor, $fifo): void {
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor, $fifo, $pwd): void {
if ($fifo) {
$length = \unpack('v', \fread($server, 2))[1];
if (!$length) {
@ -104,13 +111,13 @@ class IpcServer
return; // Could not open fifo
}
}
$channel = new ChannelledSocket(...$sockets);
$channel = new ChannelledSocket($pwd, ...$sockets);
} else {
// Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure.
if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking.
return; // Accepting client failed.
}
$channel = new ChannelledSocket($client, $client);
$channel = new ChannelledSocket($pwd, $client, $client);
}
$deferred = $acceptor;
@ -118,7 +125,30 @@ class IpcServer
\assert($deferred !== null);
$deferred->resolve($channel);
$channel->receive()->onResolve(static function ($result, $error) use ($channel, $deferred, $pwd): \Generator {
if ($error) {
throw $error;
}
if ($result instanceof Init) {
if ($result->getPassword() !== $pwd) {
yield $channel->send(InitAck::wrongPassword());
yield $channel->disconnect();
} else if ($result->getVersion() !== self::VERSION) {
yield $channel->send(InitAck::wrongPassword());
yield $channel->disconnect();
} else {
yield $channel->send(InitAck::ok());
$deferred->resolve($channel);
}
} else {
if ($pwd === '') {
$deferred->resolve($channel);
} else {
yield $channel->disconnect();
}
}
$channel = null;
});
if (!$acceptor) {
Loop::disable($watcher);
@ -129,7 +159,7 @@ class IpcServer
}
/**
* Destructor function
* Destructor function.
*/
public function __destruct()
{

View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Signaling;
final class CloseAck implements Inband
{
}

View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Signaling;
final class CloseReq implements Inband
{
}

7
lib/Signaling/Inband.php Normal file
View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Signaling;
interface Inband
{
}

57
lib/Signaling/Init.php Normal file
View File

@ -0,0 +1,57 @@
<?php
namespace Amp\Ipc\Signaling;
final class Init
{
const MAIN = 0;
const STREAM = 1;
/** @var string Password */
private $password;
/** @var int Channel type */
private $type;
/** @var int Channel version */
private $version;
/**
* Constructor function
*
* @param string $password Server password
* @param integer $type Chanel type
* @param integer $version Chanel version
*/
public function __construct(string $password, int $type, int $version)
{
$this->password = $password;
$this->type = $type;
$this->version = $version;
}
/**
* Get init password
*
* @return string
*/
public function getPassword(): string
{
return $this->password;
}
/**
* Get channel type
*
* @return integer
*/
public function getType(): int
{
return $this->type;
}
/**
* Get channel version
*
* @return integer
*/
public function getVersion(): int
{
return $this->version;
}
}

60
lib/Signaling/InitAck.php Normal file
View File

@ -0,0 +1,60 @@
<?php
namespace Amp\Ipc\Signaling;
final class InitAck
{
const STATUS_OK = 0;
const STATUS_WRONG_PASSWORD = 1;
const STATUS_WRONG_VERSION = 2;
/** @var int Status */
private $status;
/**
* Constructor
*
* @param integer $status Init status
*/
public function __construct(int $status)
{
$this->status = $status;
}
/**
* Get init status
*
* @return integer
*/
public function getStatus(): int
{
return $this->status;
}
/**
* OK
*
* @return self
*/
public static function ok(): self
{
return new self(self::STATUS_OK);
}
/**
* Wrong password
*
* @return self
*/
public static function wrongPassword(): self
{
return new self(self::STATUS_WRONG_PASSWORD);
}
/**
* Wrong version
*
* @return self
*/
public static function wrongversion(): self
{
return new self(self::STATUS_WRONG_VERSION);
}
}

42
lib/Stream/Msg.php Normal file
View File

@ -0,0 +1,42 @@
<?php
namespace Amp\Ipc\Stream;
use Amp\Ipc\Signaling\Generic;
final class StreamMsg implements Generic
{
/** @var int stream ID */
private $streamId;
/** @var mixed Message payload */
private $payload;
/**
* Constructo stream message.
*
* @param int $streamId Stream ID
* @param mixed $payload Payload
*/
public function __construct(int $streamId, $payload)
{
$this->streamId = $streamId;
$this->payload = $payload;
}
/**
* Get stream ID.
*
* @return int
*/
public function getStreamId(): int
{
return $this->streamId;
}
/**
* Get message payload.
*
* @return mixed
*/
public function getPayload()
{
return $this->payload;
}
}

View File

@ -5,8 +5,10 @@ namespace Amp\Ipc\Sync;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Deferred;
use Amp\Ipc\Sync\Signaling\CloseAck;
use Amp\Ipc\Sync\Signaling\CloseReq;
use Amp\Ipc\Signaling\CloseAck;
use Amp\Ipc\Signaling\CloseReq;
use Amp\Ipc\Signaling\Inband;
use Amp\Ipc\Signaling\StreamMsg;
use Amp\Promise;
use Amp\Success;
@ -39,14 +41,19 @@ final class ChannelledSocket implements Channel
/** @var bool */
private $reading = false;
/** @var string Server password */
private $password = '';
/**
* @param string $pwd Server password.
* @param resource $read Readable stream resource.
* @param resource $write Writable stream resource.
*
* @throws \Error If a stream resource is not given for $resource.
*/
public function __construct($read, $write)
public function __construct($pwd, $read, $write)
{
$this->password = $pwd;
$this->channel = new ChannelledStream(
$this->read = new ResourceInputStream($read),
$this->write = new ResourceOutputStream($write)
@ -66,19 +73,19 @@ final class ChannelledSocket implements Channel
$data = yield $this->channel->receive();
$this->reading = false;
if (!$data instanceof Inband) {
return $data;
}
if ($data instanceof CloseReq) {
yield $this->channel->send(new CloseAck);
$this->state = self::GOT_FIN_MASK;
yield $this->disconnect();
return null;
} elseif ($data instanceof CloseAck) {
$closePromise = $this->closePromise;
$this->closePromise = null;
$closePromise->resolve($data);
return null;
}
return $data;
});
}
@ -126,6 +133,17 @@ final class ChannelledSocket implements Channel
return $this->channel->send($data);
}
/**
* Wrap stream for usage over IPC channel
*
* @param InputStream|OutputStream|mixed $stream Stream
*
* @return inpu
*/
public function wrap($stream): Promise
{
}
/**
* {@inheritdoc}
*/

View File

@ -1,7 +0,0 @@
<?php
namespace Amp\Ipc\Sync\Signaling;
final class CloseAck implements ChannelCloseMsg
{
}

View File

@ -1,7 +0,0 @@
<?php
namespace Amp\Ipc\Sync\Signaling;
interface lCloseMsg
{
}

View File

@ -1,7 +0,0 @@
<?php
namespace Amp\Ipc\Sync\Signaling;
final class CloseReq implements ChannelCloseMsg
{
}

View File

@ -1,31 +0,0 @@
<?php
namespace Amp\Ipc\Sync\Signaling;
class Init
{
/**
* Stream ID.
*
* @var string
*/
private $streamId = '';
/**
* Constructor.
*
* @param string $streamId Optional stream ID
*/
public function __construct(string $streamId = '')
{
$this->streamId =$streamId;
}
/**
* Get strema ID.
*
* @return string
*/
public function getStreamId(): string
{
return $this->streamId;
}
}

View File

@ -2,6 +2,8 @@
namespace Amp\Ipc;
use Amp\Ipc\Signaling\Init;
use Amp\Ipc\Signaling\InitAck;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Promise;
@ -11,23 +13,40 @@ use function Amp\call;
* Create IPC server.
*
* @param string $uri Local endpoint on which to listen for requests
* @param string $pwd Optional authentication password
*
* @return IpcServer
*/
function listen(string $uri): IpcServer
function listen(string $uri, string $pwd = ''): IpcServer
{
return new IpcServer($uri);
return new IpcServer($uri, $pwd);
}
/**
* Connect to IPC server.
*
* @param string $uri URI
* @param string $pwd Optional authentication password
*
* @return Promise<ChannelledSocket>
*/
function connect(string $uri): Promise
function connect(string $uri, string $pwd = ''): Promise
{
return call(static function () use ($uri) {
return connectInternal($uri, $pwd, Init::MAIN);
}
/**
* Connect to IPC server (internal function, don't use).
*
* @param string $uri URI
* @param string $pwd Optional authentication password
* @param int $type Socket type
*
* @internal Internal method
*
* @return Promise<ChannelledSocket>
*/
function connectInternal(string $uri, string $pwd, int $type): Promise
{
return call(static function () use ($uri, $pwd, $type) {
if (!\file_exists($uri)) {
throw new \RuntimeException("The endpoint does not exist!");
}
@ -87,6 +106,22 @@ function connect(string $uri): Promise
\fclose($tempSocket);
$tempSocket = null;
return new ChannelledSocket(...$sockets);
$channel = new ChannelledSocket(...$sockets);
yield $channel->send(new Init($pwd, $type, IpcServer::VERSION));
$ack = yield $channel->receive();
if (!$ack instanceof InitAck) {
throw new \RuntimeException("Received invalid init ACK!");
}
if ($ack->getStatus() === InitAck::STATUS_OK) {
return $channel;
}
yield $channel->disconnect();
$status = $ack->getStatus();
if ($status === InitAck::STATUS_WRONG_PASSWORD) {
throw new \RuntimeException("Wrong IPC server password!");
} elseif ($status === InitAck::STATUS_WRONG_VERSION) {
throw new \RuntimeException("Wrong IPC server version, please upgrade client or server!");
}
throw new \RuntimeException("Invalid IPC InitAck status: $status");
});
}