From 219b05c9da66a585df44a09375304248b8f0a164 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 14 Feb 2020 20:31:11 +0100 Subject: [PATCH] First commit --- .gitignore | 7 + .php_cs.dist | 13 + .travis.yml | 39 +++ CONTRIBUTING.md | 41 +++ LICENSE | 21 ++ README.md | 22 ++ appveyor.yml | 38 +++ composer.json | 59 +++++ examples/client.php | 25 ++ examples/server.php | 28 ++ lib/IpcServer.php | 216 ++++++++++++++++ lib/PendingAcceptError.php | 17 ++ lib/Sync/Channel.php | 36 +++ lib/Sync/ChannelException.php | 7 + lib/Sync/ChannelParser.php | 88 +++++++ lib/Sync/ChannelledSocket.php | 68 +++++ lib/Sync/ChannelledStream.php | 81 ++++++ lib/Sync/ExitFailure.php | 60 +++++ lib/Sync/ExitResult.php | 13 + lib/Sync/ExitSuccess.php | 22 ++ lib/Sync/Internal/ParcelStorage.php | 33 +++ lib/Sync/PanicError.php | 48 ++++ lib/Sync/Parcel.php | 38 +++ lib/Sync/SerializationException.php | 7 + lib/Sync/SharedMemoryException.php | 7 + lib/Sync/SharedMemoryParcel.php | 385 ++++++++++++++++++++++++++++ lib/Sync/SynchronizationError.php | 7 + lib/Sync/ThreadedParcel.php | 62 +++++ lib/functions.php | 78 ++++++ phpunit.xml.dist | 28 ++ 30 files changed, 1594 insertions(+) create mode 100644 .gitignore create mode 100644 .php_cs.dist create mode 100644 .travis.yml create mode 100644 CONTRIBUTING.md create mode 100644 LICENSE create mode 100644 README.md create mode 100644 appveyor.yml create mode 100644 composer.json create mode 100644 examples/client.php create mode 100644 examples/server.php create mode 100644 lib/IpcServer.php create mode 100644 lib/PendingAcceptError.php create mode 100644 lib/Sync/Channel.php create mode 100644 lib/Sync/ChannelException.php create mode 100644 lib/Sync/ChannelParser.php create mode 100644 lib/Sync/ChannelledSocket.php create mode 100644 lib/Sync/ChannelledStream.php create mode 100644 lib/Sync/ExitFailure.php create mode 100644 lib/Sync/ExitResult.php create mode 100644 lib/Sync/ExitSuccess.php create mode 100644 lib/Sync/Internal/ParcelStorage.php create mode 100644 lib/Sync/PanicError.php create mode 100644 lib/Sync/Parcel.php create mode 100644 lib/Sync/SerializationException.php create mode 100644 lib/Sync/SharedMemoryException.php create mode 100644 lib/Sync/SharedMemoryParcel.php create mode 100644 lib/Sync/SynchronizationError.php create mode 100644 lib/Sync/ThreadedParcel.php create mode 100644 lib/functions.php create mode 100644 phpunit.xml.dist diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bef310b --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.vscode +build +composer.lock +phpunit.xml +vendor +.php_cs.cache +coverage diff --git a/.php_cs.dist b/.php_cs.dist new file mode 100644 index 0000000..8d02bce --- /dev/null +++ b/.php_cs.dist @@ -0,0 +1,13 @@ +getFinder() + ->in(__DIR__ . '/examples') + ->in(__DIR__ . '/lib') + ->in(__DIR__ . '/test'); + +$cacheDir = getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__; + +$config->setCacheFile($cacheDir . '/.php_cs.cache'); + +return $config; diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..71b9734 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,39 @@ +sudo: false + +language: php + +php: + - 7.0 + - 7.1 + - 7.2 + - 7.3 + - nightly + +matrix: + allow_failures: + - php: nightly + fast_finish: true + +env: + - AMP_DEBUG=true + +before_install: + - phpenv config-rm xdebug.ini || echo "No xdebug config." + +install: + - composer update -n --prefer-dist + - wget https://github.com/php-coveralls/php-coveralls/releases/download/v1.0.2/coveralls.phar + - chmod +x coveralls.phar + +script: + - vendor/bin/phpunit --coverage-text --coverage-clover build/logs/clover.xml + - PHP_CS_FIXER_IGNORE_ENV=1 php vendor/bin/php-cs-fixer --diff --dry-run -v fix + +after_script: + - ./coveralls.phar -v + +cache: + directories: + - $HOME/.composer/cache + - $HOME/.php-cs-fixer + - $HOME/.local diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..c608fce --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,41 @@ +## Submitting useful bug reports + +Please search existing issues first to make sure this is not a duplicate. +Every issue report has a cost for the developers required to field it; be +respectful of others' time and ensure your report isn't spurious prior to +submission. Please adhere to [sound bug reporting principles](http://www.chiark.greenend.org.uk/~sgtatham/bugs.html). + +## Development ideology + +Truths which we believe to be self-evident: + +- **It's an asynchronous world.** Be wary of anything that undermines + async principles. + +- **The answer is not more options.** If you feel compelled to expose + new preferences to the user it's very possible you've made a wrong + turn somewhere. + +- **There are no power users.** The idea that some users "understand" + concepts better than others has proven to be, for the most part, false. + If anything, "power users" are more dangerous than the rest, and we + should avoid exposing dangerous functionality to them. + +## Code style + +The amphp project adheres to the [PSR-2 style guide](https://github.com/php-fig/fig-standards/blob/master/accepted/PSR-2-coding-style-guide.md +). + +To apply code standards you can run `php-cs-fixer` with following composer command: + +```bash +composer code-style +``` + +## Running the tests + +Run the test suite from root directory: + +```bash +composer test +``` diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..87a8244 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2015 amphp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..4e55d28 --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +# template + +[![Build Status](https://img.shields.io/travis/amphp/template/master.svg?style=flat-square)](https://travis-ci.org/amphp/template) +[![CoverageStatus](https://img.shields.io/coveralls/amphp/template/master.svg?style=flat-square)](https://coveralls.io/github/amphp/template?branch=master) +![License](https://img.shields.io/badge/license-MIT-blue.svg?style=flat-square) + +`amphp/template` provides a template for AMPHP repos. + +## Installation + +```bash +composer require amphp/template +``` + +## Example + +```php + appveyor.yml + +init: + - SET PATH=C:\Program Files\OpenSSL;c:\tools\php74;%PATH% + - SET COMPOSER_NO_INTERACTION=1 + - SET PHP=1 + - SET ANSICON=121x90 (121x90) + +install: + - IF EXIST c:\tools\php74 (SET PHP=0) + - IF %PHP%==1 sc config wuauserv start= auto + - IF %PHP%==1 net start wuauserv + - IF %PHP%==1 cinst -y OpenSSL.Light + - IF %PHP%==1 cinst -y php + - cd c:\tools\php74 + - IF %PHP%==1 copy php.ini-production php.ini /Y + - IF %PHP%==1 echo date.timezone="UTC" >> php.ini + - IF %PHP%==1 echo extension_dir=ext >> php.ini + - IF %PHP%==1 echo extension=php_openssl.dll >> php.ini + - IF %PHP%==1 echo extension=php_mbstring.dll >> php.ini + - IF %PHP%==1 echo extension=php_fileinfo.dll >> php.ini + - cd c:\projects\amphp + - appveyor DownloadFile https://getcomposer.org/composer.phar + - php composer.phar install --prefer-dist --no-progress + +test_script: + - cd c:\projects\amphp + - vendor/bin/phpunit --colors=always diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..065ef25 --- /dev/null +++ b/composer.json @@ -0,0 +1,59 @@ +{ + "name": "amphp/ipc", + "description": "IPC component for Amp.", + "keywords": [ + "asynchronous", + "async", + "concurrent", + "multi-threading", + "multi-processing" + ], + "homepage": "https://github.com/amphp/ipc", + "license": "MIT", + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Daniil Gentili", + "email": "daniil@daniil.it" + }, + { + "name": "Stephen Coakley", + "email": "me@stephencoakley.com" + } + ], + "require": { + "php": ">=7.1", + "amphp/byte-stream": "^1.7" + }, + "require-dev": { + "amphp/amp": "^2.4", + "phpunit/phpunit": "^8 || ^7", + "amphp/phpunit-util": "^1.1", + "amphp/php-cs-fixer-config": "dev-master" + }, + "autoload": { + "psr-4": { + "Amp\\Ipc\\": "lib" + }, + "files": [ + "lib/functions.php" + ] + }, + "autoload-dev": { + "psr-4": { + "Amp\\Ipc\\Test\\": "test" + } + }, + "scripts": { + "check": [ + "@cs", + "@test" + ], + "cs": "php-cs-fixer fix -v --diff --dry-run", + "cs-fix": "php-cs-fixer fix -v --diff", + "test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit --coverage-text" + } +} diff --git a/examples/client.php b/examples/client.php new file mode 100644 index 0000000..bceb587 --- /dev/null +++ b/examples/client.php @@ -0,0 +1,25 @@ +receive()) { + echo "Received $payload".PHP_EOL; + $socket->close(); + } + echo "Closed connection".PHP_EOL; + }; + + $channel = yield connect(sys_get_temp_dir().'/test'); + asyncCall($clientHandler, $channel); + yield $channel->send('ping'); +}); \ No newline at end of file diff --git a/examples/server.php b/examples/server.php new file mode 100644 index 0000000..094c7e3 --- /dev/null +++ b/examples/server.php @@ -0,0 +1,28 @@ +receive()) { + echo "Received $payload".PHP_EOL; + if ($payload === 'ping') { + yield $socket->send('pong'); + } + } + echo "Closed connection".PHP_EOL; + }; + + $server = new IpcServer(sys_get_temp_dir().'/test'); + while ($socket = yield $server->accept()) { + asyncCall($clientHandler, $socket); + } +}); diff --git a/lib/IpcServer.php b/lib/IpcServer.php new file mode 100644 index 0000000..c9bb439 --- /dev/null +++ b/lib/IpcServer.php @@ -0,0 +1,216 @@ +uri = $uri; + + + $isWindows = \strncasecmp(\PHP_OS, "WIN", 3) === 0; + + if ($isWindows) { + if ($useFIFO) { + throw new \RuntimeException("Cannot use FIFOs on windows"); + } + $listenUri = "tcp://127.0.0.1:0"; + } else { + $listenUri = "unix://".$uri; + } + + if (!$useFIFO) { + $this->server = \stream_socket_server($listenUri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN); + } + + $fifo = false; + if (!$this->server) { + if ($isWindows) { + throw new \RuntimeException(\sprintf("Could not create IPC server: (Errno: %d) %s", $errno, $errstr)); + } + if (!\posix_mkfifo($uri, 0777)) { + throw new \RuntimeException(\sprintf("Could not create the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr)); + } + if (!$this->server = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader + throw new \RuntimeException(\sprintf("Could not connect to the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr)); + } + \stream_set_blocking($this->server, false); + $fifo = true; + } + + if ($isWindows) { + $name = \stream_socket_get_name($this->server, false); + $port = \substr($name, \strrpos($name, ":") + 1); + \file_put_contents($this->uri, "tcp://127.0.0.1:".$port); + } + + $acceptor = &$this->acceptor; + $this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor, $fifo): void { + if ($fifo) { + $length = \unpack('v', \fread($server, 2))[1]; + if (!$length) { + return; // Could not accept, wrong length read + } + + $prefix = \fread($server, $length); + $sockets = [ + $prefix.'1', + $prefix.'2', + ]; + + foreach ($sockets as $k => &$socket) { + if (@\filetype($socket) !== 'fifo') { + if ($k) { + \fclose($sockets[0]); + } + return; // Is not a FIFO + } + + // Open in either read or write mode to send a close signal when done + if (!$socket = \fopen($socket, $k ? 'w' : 'r')) { + if ($k) { + \fclose($sockets[0]); + } + return; // Could not open fifo + } + } + $channel = new ChannelledSocket(...$sockets); + } else { + // Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure. + if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking. + return; // Accepting client failed. + } + $channel = new ChannelledSocket($client, $client); + } + + $deferred = $acceptor; + $acceptor = null; + + \assert($deferred !== null); + + $deferred->resolve($channel); + + if (!$acceptor) { + Loop::disable($watcher); + } + }); + + Loop::disable($this->watcher); + } + + public function __destruct() + { + $this->close(); + } + + /** + * @return Promise + * + * @throws PendingAcceptError If another accept request is pending. + */ + public function accept(): Promise + { + if ($this->acceptor) { + throw new PendingAcceptError; + } + + if (!$this->server) { + return new Success(); // Resolve with null when server is closed. + } + + $this->acceptor = new Deferred; + Loop::enable($this->watcher); + + return $this->acceptor->promise(); + } + + /** + * Closes the server and stops accepting connections. Any socket clients accepted will not be closed. + */ + public function close(): void + { + Loop::cancel($this->watcher); + + if ($this->acceptor) { + $this->acceptor->resolve(); + $this->acceptor = null; + } + + if ($this->server) { + \fclose($this->server); + $this->server = null; + } + if ($this->uri !== null) { + @\unlink($this->uri); + $this->uri = null; + } + } + + /** + * @return bool + */ + public function isClosed(): bool + { + return $this->server === null; + } + + /** + * References the accept watcher. + * + * @see Loop::reference() + */ + final public function reference(): void + { + Loop::reference($this->watcher); + } + + /** + * Unreferences the accept watcher. + * + * @see Loop::unreference() + */ + final public function unreference(): void + { + Loop::unreference($this->watcher); + } + + /** + * Get endpoint to which clients should connect. + * + * @return string + */ + public function getUri(): string + { + return $this->uri; + } +} diff --git a/lib/PendingAcceptError.php b/lib/PendingAcceptError.php new file mode 100644 index 0000000..e98caf1 --- /dev/null +++ b/lib/PendingAcceptError.php @@ -0,0 +1,17 @@ + + * + * @throws \Amp\Ipc\Context\StatusError Thrown if the context has not been started. + * @throws \Amp\Ipc\Sync\SynchronizationError If the context has not been started or the context + * unexpectedly ends. + * @throws \Amp\Ipc\Sync\ChannelException If receiving from the channel fails. + * @throws \Amp\Ipc\Sync\SerializationException If unserializing the data fails. + */ + public function receive(): Promise; + + /** + * @param mixed $data + * + * @return \Amp\Promise Resolves with the number of bytes sent on the channel. + * + * @throws \Amp\Ipc\Context\StatusError Thrown if the context has not been started. + * @throws \Amp\Ipc\Sync\SynchronizationError If the context has not been started or the context + * unexpectedly ends. + * @throws \Amp\Ipc\Sync\ChannelException If sending on the channel fails. + * @throws \Error If an ExitResult object is given. + * @throws \Amp\Ipc\Sync\SerializationException If serializing the data fails. + */ + public function send($data): Promise; +} diff --git a/lib/Sync/ChannelException.php b/lib/Sync/ChannelException.php new file mode 100644 index 0000000..68be3ff --- /dev/null +++ b/lib/Sync/ChannelException.php @@ -0,0 +1,7 @@ +channel = new ChannelledStream( + $this->read = new ResourceInputStream($read), + $this->write = new ResourceOutputStream($write) + ); + } + + /** + * {@inheritdoc} + */ + public function receive(): Promise + { + return $this->channel->receive(); + } + + /** + * {@inheritdoc} + */ + public function send($data): Promise + { + return $this->channel->send($data); + } + + public function unreference() + { + $this->read->unreference(); + } + + public function reference() + { + $this->read->reference(); + } + + /** + * Closes the read and write resource streams. + */ + public function close() + { + $this->read->close(); + $this->write->close(); + } +} diff --git a/lib/Sync/ChannelledStream.php b/lib/Sync/ChannelledStream.php new file mode 100644 index 0000000..59a5852 --- /dev/null +++ b/lib/Sync/ChannelledStream.php @@ -0,0 +1,81 @@ +read = $read; + $this->write = $write; + $this->received = new \SplQueue; + $this->parser = new ChannelParser([$this->received, 'push']); + } + + /** + * {@inheritdoc} + */ + public function send($data): Promise + { + return call(function () use ($data) { + try { + return yield $this->write->write($this->parser->encode($data)); + } catch (StreamException $exception) { + throw new ChannelException("Sending on the channel failed. Did the context die?", 0, $exception); + } + }); + } + + /** + * {@inheritdoc} + */ + public function receive(): Promise + { + return call(function () { + while ($this->received->isEmpty()) { + try { + $chunk = yield $this->read->read(); + } catch (StreamException $exception) { + throw new ChannelException("Reading from the channel failed. Did the context die?", 0, $exception); + } + + if ($chunk === null) { + return null; + } + + $this->parser->push($chunk); + } + + return $this->received->shift(); + }); + } +} diff --git a/lib/Sync/ExitFailure.php b/lib/Sync/ExitFailure.php new file mode 100644 index 0000000..1c68e70 --- /dev/null +++ b/lib/Sync/ExitFailure.php @@ -0,0 +1,60 @@ +type = \get_class($exception); + $this->message = $exception->getMessage(); + $this->code = $exception->getCode(); + $this->trace = $exception->getTraceAsString(); + + if ($previous = $exception->getPrevious()) { + $this->previous = new self($previous); + } + } + + /** + * {@inheritdoc} + */ + public function getResult() + { + throw $this->createException(); + } + + private function createException(): PanicError + { + $previous = $this->previous ? $this->previous->createException() : null; + + return new PanicError( + $this->type, + \sprintf( + 'Uncaught %s in worker with message "%s" and code "%s"; use %s::getPanicTrace() ' + . 'for the stack trace in the context', + $this->type, + $this->message, + $this->code, + PanicError::class + ), + $this->trace, + $previous + ); + } +} diff --git a/lib/Sync/ExitResult.php b/lib/Sync/ExitResult.php new file mode 100644 index 0000000..7b387ba --- /dev/null +++ b/lib/Sync/ExitResult.php @@ -0,0 +1,13 @@ +result = $result; + } + + /** + * {@inheritdoc} + */ + public function getResult() + { + return $this->result; + } +} diff --git a/lib/Sync/Internal/ParcelStorage.php b/lib/Sync/Internal/ParcelStorage.php new file mode 100644 index 0000000..6d0906e --- /dev/null +++ b/lib/Sync/Internal/ParcelStorage.php @@ -0,0 +1,33 @@ +value = $value; + } + + /** + * @return mixed + */ + public function get() + { + return $this->value; + } + + /** + * @param mixed $value + */ + public function set($value) + { + $this->value = $value; + } +} diff --git a/lib/Sync/PanicError.php b/lib/Sync/PanicError.php new file mode 100644 index 0000000..8d79a5d --- /dev/null +++ b/lib/Sync/PanicError.php @@ -0,0 +1,48 @@ +name = $name; + $this->trace = $trace; + } + + /** + * Returns the class name of the uncaught exception. + * + * @return string + */ + public function getName(): string + { + return $this->name; + } + + /** + * Gets the stack trace at the point the panic occurred. + * + * @return string + */ + public function getPanicTrace(): string + { + return $this->trace; + } +} diff --git a/lib/Sync/Parcel.php b/lib/Sync/Parcel.php new file mode 100644 index 0000000..f5cfb7c --- /dev/null +++ b/lib/Sync/Parcel.php @@ -0,0 +1,38 @@ + Resolves with the return value of $callback or fails if $callback + * throws an exception. + */ + public function synchronized(callable $callback): Promise; + + /** + * @return \Amp\Promise A promise for the value inside the parcel. + */ + public function unwrap(): Promise; +} diff --git a/lib/Sync/SerializationException.php b/lib/Sync/SerializationException.php new file mode 100644 index 0000000..bc40aae --- /dev/null +++ b/lib/Sync/SerializationException.php @@ -0,0 +1,7 @@ +init($value, $size, $permissions); + return $parcel; + } + + /** + * @param string $id + * + * @return \Amp\Ipc\Sync\SharedMemoryParcel + */ + public static function use(string $id): self + { + $parcel = new self($id); + $parcel->open(); + return $parcel; + } + + /** + * Creates a new local object container. + * + * The object given will be assigned a new object ID and will have a + * reference to it stored in memory local to the thread. + * + * @param mixed $value The value to store in the container. + * @param int $size The number of bytes to allocate for the object. + * If not specified defaults to 16384 bytes. + * @param int $permissions The access permissions to set for the object. + * If not specified defaults to 0600. + */ + private function __construct(string $id) + { + if (!\extension_loaded("shmop")) { + throw new \Error(__CLASS__ . " requires the shmop extension."); + } + + $this->id = $id; + $this->key = self::makeKey($this->id); + } + + /** + * @param mixed $value + * @param int $size + * @param int $permissions + */ + private function init($value, int $size = 8192, int $permissions = 0600) + { + $this->semaphore = PosixSemaphore::create($this->id, 1); + $this->initializer = \getmypid(); + + $this->memOpen($this->key, 'n', $permissions, $size + self::MEM_DATA_OFFSET); + $this->setHeader(self::STATE_ALLOCATED, 0, $permissions); + $this->wrap($value); + } + + private function open() + { + $this->semaphore = PosixSemaphore::use($this->id); + $this->memOpen($this->key, 'w', 0, 0); + } + + /** + * Checks if the object has been freed. + * + * Note that this does not check if the object has been destroyed; it only + * checks if this handle has freed its reference to the object. + * + * @return bool True if the object is freed, otherwise false. + */ + private function isFreed(): bool + { + // If we are no longer connected to the memory segment, check if it has + // been invalidated. + if ($this->handle !== null) { + $this->handleMovedMemory(); + $header = $this->getHeader(); + return $header['state'] === static::STATE_FREED; + } + + return true; + } + + /** + * {@inheritdoc} + */ + public function unwrap(): Promise + { + if ($this->isFreed()) { + return new Failure(new SharedMemoryException('The object has already been freed.')); + } + + $header = $this->getHeader(); + + // Make sure the header is in a valid state and format. + if ($header['state'] !== self::STATE_ALLOCATED || $header['size'] <= 0) { + new Failure(new SharedMemoryException('Shared object memory is corrupt.')); + } + + // Read the actual value data from memory and unserialize it. + $data = $this->memGet(self::MEM_DATA_OFFSET, $header['size']); + return new Success(\unserialize($data)); + } + + /** + * If the value requires more memory to store than currently allocated, a + * new shared memory segment will be allocated with a larger size to store + * the value in. The previous memory segment will be cleaned up and marked + * for deletion. Other processes and threads will be notified of the new + * memory segment on the next read attempt. Once all running processes and + * threads disconnect from the old segment, it will be freed by the OS. + */ + protected function wrap($value) + { + if ($this->isFreed()) { + throw new SharedMemoryException('The object has already been freed.'); + } + + $serialized = \serialize($value); + $size = \strlen($serialized); + $header = $this->getHeader(); + + /* If we run out of space, we need to allocate a new shared memory + segment that is larger than the current one. To coordinate with other + processes, we will leave a message in the old segment that the segment + has moved and along with the new key. The old segment will be discarded + automatically after all other processes notice the change and close + the old handle. + */ + if (\shmop_size($this->handle) < $size + self::MEM_DATA_OFFSET) { + $this->key = $this->key < 0xffffffff ? $this->key + 1 : \mt_rand(0x10, 0xfffffffe); + $this->setHeader(self::STATE_MOVED, $this->key, 0); + + $this->memDelete(); + \shmop_close($this->handle); + + $this->memOpen($this->key, 'n', $header['permissions'], $size * 2); + } + + // Rewrite the header and the serialized value to memory. + $this->setHeader(self::STATE_ALLOCATED, $size, $header['permissions']); + $this->memSet(self::MEM_DATA_OFFSET, $serialized); + } + + /** + * {@inheritdoc} + */ + public function synchronized(callable $callback): Promise + { + return call(function () use ($callback) { + /** @var \Amp\Sync\Lock $lock */ + $lock = yield $this->semaphore->acquire(); + + try { + $result = yield call($callback, yield $this->unwrap()); + + if ($result !== null) { + $this->wrap($result); + } + } finally { + $lock->release(); + } + + return $result; + }); + } + + + /** + * Frees the shared object from memory. + * + * The memory containing the shared value will be invalidated. When all + * process disconnect from the object, the shared memory block will be + * destroyed by the OS. + */ + public function __destruct() + { + if ($this->initializer === 0 || $this->initializer !== \getmypid()) { + return; + } + + if ($this->isFreed()) { + return; + } + + // Invalidate the memory block by setting its state to FREED. + $this->setHeader(static::STATE_FREED, 0, 0); + + // Request the block to be deleted, then close our local handle. + $this->memDelete(); + \shmop_close($this->handle); + $this->handle = null; + + $this->semaphore = null; + } + + /** + * Private method to prevent cloning. + */ + private function __clone() + { + } + + /** + * Private method to prevent serialization. + */ + private function __sleep() + { + } + + /** + * Updates the current memory segment handle, handling any moves made on the + * data. + */ + private function handleMovedMemory() + { + // Read from the memory block and handle moved blocks until we find the + // correct block. + while (true) { + $header = $this->getHeader(); + + // If the state is STATE_MOVED, the memory is stale and has been moved + // to a new location. Move handle and try to read again. + if ($header['state'] !== self::STATE_MOVED) { + break; + } + + \shmop_close($this->handle); + $this->key = $header['size']; + $this->memOpen($this->key, 'w', 0, 0); + } + } + + /** + * Reads and returns the data header at the current memory segment. + * + * @return array An associative array of header data. + */ + private function getHeader(): array + { + $data = $this->memGet(0, self::MEM_DATA_OFFSET); + return \unpack('Cstate/Lsize/Spermissions', $data); + } + + /** + * Sets the header data for the current memory segment. + * + * @param int $state An object state. + * @param int $size The size of the stored data, or other value. + * @param int $permissions The permissions mask on the memory segment. + */ + private function setHeader(int $state, int $size, int $permissions) + { + $header = \pack('CLS', $state, $size, $permissions); + $this->memSet(0, $header); + } + + /** + * Opens a shared memory handle. + * + * @param int $key The shared memory key. + * @param string $mode The mode to open the shared memory in. + * @param int $permissions Process permissions on the shared memory. + * @param int $size The size to crate the shared memory in bytes. + */ + private function memOpen(int $key, string $mode, int $permissions, int $size) + { + $this->handle = @\shmop_open($key, $mode, $permissions, $size); + if ($this->handle === false) { + throw new SharedMemoryException('Failed to create shared memory block.'); + } + } + + /** + * Reads binary data from shared memory. + * + * @param int $offset The offset to read from. + * @param int $size The number of bytes to read. + * + * @return string The binary data at the given offset. + */ + private function memGet(int $offset, int $size): string + { + $data = \shmop_read($this->handle, $offset, $size); + if ($data === false) { + throw new SharedMemoryException('Failed to read from shared memory block.'); + } + return $data; + } + + /** + * Writes binary data to shared memory. + * + * @param int $offset The offset to write to. + * @param string $data The binary data to write. + */ + private function memSet(int $offset, string $data) + { + if (!\shmop_write($this->handle, $data, $offset)) { + throw new SharedMemoryException('Failed to write to shared memory block.'); + } + } + + /** + * Requests the shared memory segment to be deleted. + */ + private function memDelete() + { + if (!\shmop_delete($this->handle)) { + throw new SharedMemoryException('Failed to discard shared memory block.'); + } + } + + private static function makeKey(string $id): int + { + return \abs(\unpack("l", \md5($id, true))[1]); + } +} diff --git a/lib/Sync/SynchronizationError.php b/lib/Sync/SynchronizationError.php new file mode 100644 index 0000000..63d91ed --- /dev/null +++ b/lib/Sync/SynchronizationError.php @@ -0,0 +1,7 @@ +mutex = new ThreadedMutex; + $this->storage = new Internal\ParcelStorage($value); + } + + /** + * {@inheritdoc} + */ + public function unwrap(): Promise + { + return new Success($this->storage->get()); + } + + /** + * @return \Amp\Promise + */ + public function synchronized(callable $callback): Promise + { + return call(function () use ($callback) { + /** @var \Amp\Sync\Lock $lock */ + $lock = yield $this->mutex->acquire(); + + try { + $result = yield call($callback, $this->storage->get()); + + if ($result !== null) { + $this->storage->set($result); + } + } finally { + $lock->release(); + } + + return $result; + }); + } +} diff --git a/lib/functions.php b/lib/functions.php new file mode 100644 index 0000000..1ff3fc2 --- /dev/null +++ b/lib/functions.php @@ -0,0 +1,78 @@ + + */ +function connect(string $uri): Promise +{ + return call(static function () use ($uri) { + $type = \filetype($uri); + if ($type === 'fifo') { + $suffix = \bin2hex(\random_bytes(10)); + $prefix = \sys_get_temp_dir()."/amp-".$suffix.".fifo"; + + if (\strlen($prefix) > 0xFFFF) { + \trigger_error("Prefix is too long!", E_USER_ERROR); + exit(1); + } + + $sockets = [ + $prefix."2", + $prefix."1", + ]; + + foreach ($sockets as $k => &$socket) { + if (!\posix_mkfifo($socket, 0777)) { + \trigger_error("Could not create FIFO client socket", E_USER_ERROR); + exit(1); + } + + \register_shutdown_function(static function () use ($socket): void { + @\unlink($socket); + }); + + if (!$socket = \fopen($socket, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader + \trigger_error("Could not open FIFO client socket", E_USER_ERROR); + exit(1); + } + } + + if (!$tempSocket = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader + \trigger_error("Could not connect to FIFO server", E_USER_ERROR); + exit(1); + } + \stream_set_blocking($tempSocket, false); + \stream_set_write_buffer($tempSocket, 0); + + if (!\fwrite($tempSocket, \pack('v', \strlen($prefix)).$prefix)) { + \trigger_error("Failure sending request to FIFO server", E_USER_ERROR); + exit(1); + } + \fclose($tempSocket); + $tempSocket = null; + + return new ChannelledSocket(...$sockets); + } + if ($type === 'file') { + $uri = \file_get_contents($uri); + } else { + $uri = "unix://$uri"; + } + if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) { + \trigger_error("Could not connect to IPC socket", E_USER_ERROR); + exit(1); + } + return new ChannelledSocket($socket, $socket); + }); +} diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..ce62c46 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,28 @@ + + + + + test + + + + + lib + + + + + +