diff --git a/README.md b/README.md index 4e55d28..07c39bf 100644 --- a/README.md +++ b/README.md @@ -4,19 +4,76 @@ [![CoverageStatus](https://img.shields.io/coveralls/amphp/template/master.svg?style=flat-square)](https://coveralls.io/github/amphp/template?branch=master) ![License](https://img.shields.io/badge/license-MIT-blue.svg?style=flat-square) -`amphp/template` provides a template for AMPHP repos. +`amphp/ipc` provides an async IPC server. ## Installation ```bash -composer require amphp/template +composer require amphp/ipc ``` ## Example +Server: + ```php receive()) { + echo "Received $payload".PHP_EOL; + if ($payload === 'ping') { + yield $socket->send('pong'); + } + } + yield $socket->disconnect(); + echo "Closed connection".PHP_EOL; + }; + + $server = new IpcServer(sys_get_temp_dir().'/test'); + while ($socket = yield $server->accept()) { + asyncCall($clientHandler, $socket); + } +}); ``` + +Client: + +```php +receive()) { + echo "Received $payload".PHP_EOL; + yield $socket->disconnect(); + } + echo "Closed connection".PHP_EOL; + }; + + $channel = yield connect(sys_get_temp_dir().'/test'); + asyncCall($clientHandler, $channel); + yield $channel->send('ping'); +}); +``` \ No newline at end of file diff --git a/composer.json b/composer.json index 065ef25..15ac609 100644 --- a/composer.json +++ b/composer.json @@ -26,12 +26,14 @@ ], "require": { "php": ">=7.1", - "amphp/byte-stream": "^1.7" + "amphp/byte-stream": "^1.7", + "amphp/parser": "^1.0" }, "require-dev": { "amphp/amp": "^2.4", - "phpunit/phpunit": "^8 || ^7", - "amphp/phpunit-util": "^1.1", + "amphp/parallel": "^1.3", + "phpunit/phpunit": "^9", + "amphp/phpunit-util": "^1.3", "amphp/php-cs-fixer-config": "dev-master" }, "autoload": { diff --git a/examples/client.php b/examples/client.php index bceb587..4627563 100644 --- a/examples/client.php +++ b/examples/client.php @@ -14,7 +14,7 @@ Loop::run(static function () { while ($payload = yield $socket->receive()) { echo "Received $payload".PHP_EOL; - $socket->close(); + yield $socket->disconnect(); } echo "Closed connection".PHP_EOL; }; diff --git a/examples/server.php b/examples/server.php index 094c7e3..56f5638 100644 --- a/examples/server.php +++ b/examples/server.php @@ -18,11 +18,12 @@ Loop::run(static function () { yield $socket->send('pong'); } } - echo "Closed connection".PHP_EOL; + yield $socket->disconnect(); + echo "Closed connection".PHP_EOL."==========".PHP_EOL; }; $server = new IpcServer(sys_get_temp_dir().'/test'); while ($socket = yield $server->accept()) { asyncCall($clientHandler, $socket); } -}); +}); \ No newline at end of file diff --git a/lib/Sync/ChannelCloseAck.php b/lib/Sync/ChannelCloseAck.php new file mode 100644 index 0000000..d78a35a --- /dev/null +++ b/lib/Sync/ChannelCloseAck.php @@ -0,0 +1,7 @@ +channel->receive(); + if (!$this->channel) { + return new Success(); + } + return call(function (): \Generator { + $data = yield $this->channel->receive(); + + if ($data instanceof ChannelCloseReq) { + yield $this->channel->send(new ChannelCloseAck); + $this->state = self::GOT_FIN_MASK; + return null; + } + + return $data; + }); + } + + /** + * Cleanly disconnect from other endpoint. + * + * @return Promise + */ + public function disconnect(): Promise + { + if (!$this->channel) { + throw new ChannelException('The channel was already closed!'); + } + $channel = $this->channel; + $this->channel = null; + return call(function () use ($channel): \Generator { + yield $channel->send(new ChannelCloseReq); + + do { + $data = yield $channel->receive(); + + if ($data instanceof ChannelCloseReq) { + yield $channel->send(new ChannelCloseAck); + $this->state |= self::GOT_FIN_MASK; + } else if ($data instanceof ChannelCloseAck) { + $this->state |= self::GOT_ACK_MASK; + } + } while ($this->state !== self::GOT_ALL_MASK); + + + $this->close(); + }); } /** @@ -44,14 +101,23 @@ final class ChannelledSocket implements Channel */ public function send($data): Promise { + if (!$this->channel) { + throw new ChannelException('The channel was already closed!'); + } return $this->channel->send($data); } + /** + * {@inheritdoc} + */ public function unreference() { $this->read->unreference(); } + /** + * {@inheritdoc} + */ public function reference() { $this->read->reference(); @@ -60,7 +126,7 @@ final class ChannelledSocket implements Channel /** * Closes the read and write resource streams. */ - public function close() + private function close() { $this->read->close(); $this->write->close(); diff --git a/lib/Sync/ChannelledStream.php b/lib/Sync/ChannelledStream.php index 59a5852..c73bc09 100644 --- a/lib/Sync/ChannelledStream.php +++ b/lib/Sync/ChannelledStream.php @@ -69,7 +69,7 @@ final class ChannelledStream implements Channel } if ($chunk === null) { - return null; + throw new ChannelException("The channel closed unexpectedly. Did the context die?"); } $this->parser->push($chunk); diff --git a/lib/Sync/ExitFailure.php b/lib/Sync/ExitFailure.php deleted file mode 100644 index 1c68e70..0000000 --- a/lib/Sync/ExitFailure.php +++ /dev/null @@ -1,60 +0,0 @@ -type = \get_class($exception); - $this->message = $exception->getMessage(); - $this->code = $exception->getCode(); - $this->trace = $exception->getTraceAsString(); - - if ($previous = $exception->getPrevious()) { - $this->previous = new self($previous); - } - } - - /** - * {@inheritdoc} - */ - public function getResult() - { - throw $this->createException(); - } - - private function createException(): PanicError - { - $previous = $this->previous ? $this->previous->createException() : null; - - return new PanicError( - $this->type, - \sprintf( - 'Uncaught %s in worker with message "%s" and code "%s"; use %s::getPanicTrace() ' - . 'for the stack trace in the context', - $this->type, - $this->message, - $this->code, - PanicError::class - ), - $this->trace, - $previous - ); - } -} diff --git a/lib/Sync/ExitResult.php b/lib/Sync/ExitResult.php deleted file mode 100644 index 7b387ba..0000000 --- a/lib/Sync/ExitResult.php +++ /dev/null @@ -1,13 +0,0 @@ -result = $result; - } - - /** - * {@inheritdoc} - */ - public function getResult() - { - return $this->result; - } -} diff --git a/lib/Sync/Internal/ParcelStorage.php b/lib/Sync/Internal/ParcelStorage.php deleted file mode 100644 index 6d0906e..0000000 --- a/lib/Sync/Internal/ParcelStorage.php +++ /dev/null @@ -1,33 +0,0 @@ -value = $value; - } - - /** - * @return mixed - */ - public function get() - { - return $this->value; - } - - /** - * @param mixed $value - */ - public function set($value) - { - $this->value = $value; - } -} diff --git a/lib/Sync/SharedMemoryException.php b/lib/Sync/SharedMemoryException.php deleted file mode 100644 index 84fde24..0000000 --- a/lib/Sync/SharedMemoryException.php +++ /dev/null @@ -1,7 +0,0 @@ -init($value, $size, $permissions); - return $parcel; - } - - /** - * @param string $id - * - * @return \Amp\Ipc\Sync\SharedMemoryParcel - */ - public static function use(string $id): self - { - $parcel = new self($id); - $parcel->open(); - return $parcel; - } - - /** - * Creates a new local object container. - * - * The object given will be assigned a new object ID and will have a - * reference to it stored in memory local to the thread. - * - * @param mixed $value The value to store in the container. - * @param int $size The number of bytes to allocate for the object. - * If not specified defaults to 16384 bytes. - * @param int $permissions The access permissions to set for the object. - * If not specified defaults to 0600. - */ - private function __construct(string $id) - { - if (!\extension_loaded("shmop")) { - throw new \Error(__CLASS__ . " requires the shmop extension."); - } - - $this->id = $id; - $this->key = self::makeKey($this->id); - } - - /** - * @param mixed $value - * @param int $size - * @param int $permissions - */ - private function init($value, int $size = 8192, int $permissions = 0600) - { - $this->semaphore = PosixSemaphore::create($this->id, 1); - $this->initializer = \getmypid(); - - $this->memOpen($this->key, 'n', $permissions, $size + self::MEM_DATA_OFFSET); - $this->setHeader(self::STATE_ALLOCATED, 0, $permissions); - $this->wrap($value); - } - - private function open() - { - $this->semaphore = PosixSemaphore::use($this->id); - $this->memOpen($this->key, 'w', 0, 0); - } - - /** - * Checks if the object has been freed. - * - * Note that this does not check if the object has been destroyed; it only - * checks if this handle has freed its reference to the object. - * - * @return bool True if the object is freed, otherwise false. - */ - private function isFreed(): bool - { - // If we are no longer connected to the memory segment, check if it has - // been invalidated. - if ($this->handle !== null) { - $this->handleMovedMemory(); - $header = $this->getHeader(); - return $header['state'] === static::STATE_FREED; - } - - return true; - } - - /** - * {@inheritdoc} - */ - public function unwrap(): Promise - { - if ($this->isFreed()) { - return new Failure(new SharedMemoryException('The object has already been freed.')); - } - - $header = $this->getHeader(); - - // Make sure the header is in a valid state and format. - if ($header['state'] !== self::STATE_ALLOCATED || $header['size'] <= 0) { - new Failure(new SharedMemoryException('Shared object memory is corrupt.')); - } - - // Read the actual value data from memory and unserialize it. - $data = $this->memGet(self::MEM_DATA_OFFSET, $header['size']); - return new Success(\unserialize($data)); - } - - /** - * If the value requires more memory to store than currently allocated, a - * new shared memory segment will be allocated with a larger size to store - * the value in. The previous memory segment will be cleaned up and marked - * for deletion. Other processes and threads will be notified of the new - * memory segment on the next read attempt. Once all running processes and - * threads disconnect from the old segment, it will be freed by the OS. - */ - protected function wrap($value) - { - if ($this->isFreed()) { - throw new SharedMemoryException('The object has already been freed.'); - } - - $serialized = \serialize($value); - $size = \strlen($serialized); - $header = $this->getHeader(); - - /* If we run out of space, we need to allocate a new shared memory - segment that is larger than the current one. To coordinate with other - processes, we will leave a message in the old segment that the segment - has moved and along with the new key. The old segment will be discarded - automatically after all other processes notice the change and close - the old handle. - */ - if (\shmop_size($this->handle) < $size + self::MEM_DATA_OFFSET) { - $this->key = $this->key < 0xffffffff ? $this->key + 1 : \mt_rand(0x10, 0xfffffffe); - $this->setHeader(self::STATE_MOVED, $this->key, 0); - - $this->memDelete(); - \shmop_close($this->handle); - - $this->memOpen($this->key, 'n', $header['permissions'], $size * 2); - } - - // Rewrite the header and the serialized value to memory. - $this->setHeader(self::STATE_ALLOCATED, $size, $header['permissions']); - $this->memSet(self::MEM_DATA_OFFSET, $serialized); - } - - /** - * {@inheritdoc} - */ - public function synchronized(callable $callback): Promise - { - return call(function () use ($callback) { - /** @var \Amp\Sync\Lock $lock */ - $lock = yield $this->semaphore->acquire(); - - try { - $result = yield call($callback, yield $this->unwrap()); - - if ($result !== null) { - $this->wrap($result); - } - } finally { - $lock->release(); - } - - return $result; - }); - } - - - /** - * Frees the shared object from memory. - * - * The memory containing the shared value will be invalidated. When all - * process disconnect from the object, the shared memory block will be - * destroyed by the OS. - */ - public function __destruct() - { - if ($this->initializer === 0 || $this->initializer !== \getmypid()) { - return; - } - - if ($this->isFreed()) { - return; - } - - // Invalidate the memory block by setting its state to FREED. - $this->setHeader(static::STATE_FREED, 0, 0); - - // Request the block to be deleted, then close our local handle. - $this->memDelete(); - \shmop_close($this->handle); - $this->handle = null; - - $this->semaphore = null; - } - - /** - * Private method to prevent cloning. - */ - private function __clone() - { - } - - /** - * Private method to prevent serialization. - */ - private function __sleep() - { - } - - /** - * Updates the current memory segment handle, handling any moves made on the - * data. - */ - private function handleMovedMemory() - { - // Read from the memory block and handle moved blocks until we find the - // correct block. - while (true) { - $header = $this->getHeader(); - - // If the state is STATE_MOVED, the memory is stale and has been moved - // to a new location. Move handle and try to read again. - if ($header['state'] !== self::STATE_MOVED) { - break; - } - - \shmop_close($this->handle); - $this->key = $header['size']; - $this->memOpen($this->key, 'w', 0, 0); - } - } - - /** - * Reads and returns the data header at the current memory segment. - * - * @return array An associative array of header data. - */ - private function getHeader(): array - { - $data = $this->memGet(0, self::MEM_DATA_OFFSET); - return \unpack('Cstate/Lsize/Spermissions', $data); - } - - /** - * Sets the header data for the current memory segment. - * - * @param int $state An object state. - * @param int $size The size of the stored data, or other value. - * @param int $permissions The permissions mask on the memory segment. - */ - private function setHeader(int $state, int $size, int $permissions) - { - $header = \pack('CLS', $state, $size, $permissions); - $this->memSet(0, $header); - } - - /** - * Opens a shared memory handle. - * - * @param int $key The shared memory key. - * @param string $mode The mode to open the shared memory in. - * @param int $permissions Process permissions on the shared memory. - * @param int $size The size to crate the shared memory in bytes. - */ - private function memOpen(int $key, string $mode, int $permissions, int $size) - { - $this->handle = @\shmop_open($key, $mode, $permissions, $size); - if ($this->handle === false) { - throw new SharedMemoryException('Failed to create shared memory block.'); - } - } - - /** - * Reads binary data from shared memory. - * - * @param int $offset The offset to read from. - * @param int $size The number of bytes to read. - * - * @return string The binary data at the given offset. - */ - private function memGet(int $offset, int $size): string - { - $data = \shmop_read($this->handle, $offset, $size); - if ($data === false) { - throw new SharedMemoryException('Failed to read from shared memory block.'); - } - return $data; - } - - /** - * Writes binary data to shared memory. - * - * @param int $offset The offset to write to. - * @param string $data The binary data to write. - */ - private function memSet(int $offset, string $data) - { - if (!\shmop_write($this->handle, $data, $offset)) { - throw new SharedMemoryException('Failed to write to shared memory block.'); - } - } - - /** - * Requests the shared memory segment to be deleted. - */ - private function memDelete() - { - if (!\shmop_delete($this->handle)) { - throw new SharedMemoryException('Failed to discard shared memory block.'); - } - } - - private static function makeKey(string $id): int - { - return \abs(\unpack("l", \md5($id, true))[1]); - } -} diff --git a/lib/Sync/ThreadedParcel.php b/lib/Sync/ThreadedParcel.php deleted file mode 100644 index bc1d1df..0000000 --- a/lib/Sync/ThreadedParcel.php +++ /dev/null @@ -1,62 +0,0 @@ -mutex = new ThreadedMutex; - $this->storage = new Internal\ParcelStorage($value); - } - - /** - * {@inheritdoc} - */ - public function unwrap(): Promise - { - return new Success($this->storage->get()); - } - - /** - * @return \Amp\Promise - */ - public function synchronized(callable $callback): Promise - { - return call(function () use ($callback) { - /** @var \Amp\Sync\Lock $lock */ - $lock = yield $this->mutex->acquire(); - - try { - $result = yield call($callback, $this->storage->get()); - - if ($result !== null) { - $this->storage->set($result); - } - } finally { - $lock->release(); - } - - return $result; - }); - } -} diff --git a/lib/functions.php b/lib/functions.php index 1ff3fc2..c6c1498 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -17,62 +17,65 @@ use function Amp\call; function connect(string $uri): Promise { return call(static function () use ($uri) { + if (!file_exists($uri)) { + throw new \RuntimeException("The endpoint does not exist!"); + } + $type = \filetype($uri); - if ($type === 'fifo') { - $suffix = \bin2hex(\random_bytes(10)); - $prefix = \sys_get_temp_dir()."/amp-".$suffix.".fifo"; - - if (\strlen($prefix) > 0xFFFF) { - \trigger_error("Prefix is too long!", E_USER_ERROR); - exit(1); + if ($type !== 'fifo') { + if ($type === 'file') { + $uri = \file_get_contents($uri); + } else { + $uri = "unix://$uri"; } + if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) { + $message = "Could not connect to IPC socket"; + if ($error = \error_get_last()) { + $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); + } + throw new \RuntimeException($message); + } + return new ChannelledSocket($socket, $socket); + } - $sockets = [ + $suffix = \bin2hex(\random_bytes(10)); + $prefix = \sys_get_temp_dir()."/amp-".$suffix.".fifo"; + + if (\strlen($prefix) > 0xFFFF) { + throw new \RuntimeException('Prefix is too long!'); + } + + $sockets = [ $prefix."2", $prefix."1", ]; - foreach ($sockets as $k => &$socket) { - if (!\posix_mkfifo($socket, 0777)) { - \trigger_error("Could not create FIFO client socket", E_USER_ERROR); - exit(1); - } - - \register_shutdown_function(static function () use ($socket): void { - @\unlink($socket); - }); - - if (!$socket = \fopen($socket, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader - \trigger_error("Could not open FIFO client socket", E_USER_ERROR); - exit(1); - } + foreach ($sockets as $k => &$socket) { + if (!\posix_mkfifo($socket, 0777)) { + throw new \RuntimeException('Could not create FIFO client socket!'); } - if (!$tempSocket = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader - \trigger_error("Could not connect to FIFO server", E_USER_ERROR); - exit(1); - } - \stream_set_blocking($tempSocket, false); - \stream_set_write_buffer($tempSocket, 0); + \register_shutdown_function(static function () use ($socket): void { + @\unlink($socket); + }); - if (!\fwrite($tempSocket, \pack('v', \strlen($prefix)).$prefix)) { - \trigger_error("Failure sending request to FIFO server", E_USER_ERROR); - exit(1); + if (!$socket = \fopen($socket, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader + throw new \RuntimeException("Could not open FIFO client socket"); } - \fclose($tempSocket); - $tempSocket = null; + } - return new ChannelledSocket(...$sockets); + if (!$tempSocket = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader + throw new \RuntimeException("Could not connect to FIFO server"); } - if ($type === 'file') { - $uri = \file_get_contents($uri); - } else { - $uri = "unix://$uri"; + \stream_set_blocking($tempSocket, false); + \stream_set_write_buffer($tempSocket, 0); + + if (!\fwrite($tempSocket, \pack('v', \strlen($prefix)).$prefix)) { + throw new \RuntimeException("Failure sending request to FIFO server"); } - if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) { - \trigger_error("Could not connect to IPC socket", E_USER_ERROR); - exit(1); - } - return new ChannelledSocket($socket, $socket); + \fclose($tempSocket); + $tempSocket = null; + + return new ChannelledSocket(...$sockets); }); } diff --git a/phpunit.xml.dist b/phpunit.xml.dist index ce62c46..632555f 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,19 +1,19 @@ - + test @@ -22,7 +22,7 @@ lib - - - + + + diff --git a/test/Fixtures/server.php b/test/Fixtures/server.php new file mode 100644 index 0000000..ff60bff --- /dev/null +++ b/test/Fixtures/server.php @@ -0,0 +1,36 @@ +send($server->getUri()); + + $socket = yield $server->accept(); + + if (!$socket instanceof ChannelledSocket) { + throw new \RuntimeException('Socket is not instance of ChanneledSocket'); + } + + $ping = yield $socket->receive(); + + if ($ping !== 'ping') { + throw new \RuntimeException("Received $ping instead of ping!"); + } + + yield $socket->send('pong'); + yield $socket->disconnect(); + + $server->close(); + + return $server->accept(); +}; diff --git a/test/IpcTest.php b/test/IpcTest.php new file mode 100644 index 0000000..2e1e96d --- /dev/null +++ b/test/IpcTest.php @@ -0,0 +1,46 @@ +start(); + + $recvUri = yield $process->receive(); + if ($uri) { + $this->assertEquals($uri, $recvUri); + } + + $client = yield connect($recvUri); + $this->assertInstanceOf(ChannelledSocket::class, $client); + + yield $client->send('ping'); + $this->assertEquals('pong', yield $client->receive()); + + yield $client->disconnect(); + + $this->assertNull(yield $process->join()); + } + + public function provideUriFifo(): \Generator + { + foreach (['', sys_get_temp_dir().'/pony'] as $uri) { + if (\strncasecmp(\PHP_OS, "WIN", 3) === 0) { + yield [$uri, false]; + } else { + yield [$uri, true]; + yield [$uri, false]; + } + } + } +}