1
0
mirror of https://github.com/danog/ipc.git synced 2024-11-26 12:04:51 +01:00

First commit

This commit is contained in:
Daniil Gentili 2020-02-14 20:31:11 +01:00
commit 219b05c9da
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
30 changed files with 1594 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
.vscode
build
composer.lock
phpunit.xml
vendor
.php_cs.cache
coverage

13
.php_cs.dist Normal file
View File

@ -0,0 +1,13 @@
<?php
$config = new Amp\CodeStyle\Config();
$config->getFinder()
->in(__DIR__ . '/examples')
->in(__DIR__ . '/lib')
->in(__DIR__ . '/test');
$cacheDir = getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__;
$config->setCacheFile($cacheDir . '/.php_cs.cache');
return $config;

39
.travis.yml Normal file
View File

@ -0,0 +1,39 @@
sudo: false
language: php
php:
- 7.0
- 7.1
- 7.2
- 7.3
- nightly
matrix:
allow_failures:
- php: nightly
fast_finish: true
env:
- AMP_DEBUG=true
before_install:
- phpenv config-rm xdebug.ini || echo "No xdebug config."
install:
- composer update -n --prefer-dist
- wget https://github.com/php-coveralls/php-coveralls/releases/download/v1.0.2/coveralls.phar
- chmod +x coveralls.phar
script:
- vendor/bin/phpunit --coverage-text --coverage-clover build/logs/clover.xml
- PHP_CS_FIXER_IGNORE_ENV=1 php vendor/bin/php-cs-fixer --diff --dry-run -v fix
after_script:
- ./coveralls.phar -v
cache:
directories:
- $HOME/.composer/cache
- $HOME/.php-cs-fixer
- $HOME/.local

41
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,41 @@
## Submitting useful bug reports
Please search existing issues first to make sure this is not a duplicate.
Every issue report has a cost for the developers required to field it; be
respectful of others' time and ensure your report isn't spurious prior to
submission. Please adhere to [sound bug reporting principles](http://www.chiark.greenend.org.uk/~sgtatham/bugs.html).
## Development ideology
Truths which we believe to be self-evident:
- **It's an asynchronous world.** Be wary of anything that undermines
async principles.
- **The answer is not more options.** If you feel compelled to expose
new preferences to the user it's very possible you've made a wrong
turn somewhere.
- **There are no power users.** The idea that some users "understand"
concepts better than others has proven to be, for the most part, false.
If anything, "power users" are more dangerous than the rest, and we
should avoid exposing dangerous functionality to them.
## Code style
The amphp project adheres to the [PSR-2 style guide](https://github.com/php-fig/fig-standards/blob/master/accepted/PSR-2-coding-style-guide.md
).
To apply code standards you can run `php-cs-fixer` with following composer command:
```bash
composer code-style
```
## Running the tests
Run the test suite from root directory:
```bash
composer test
```

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2015 amphp
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

22
README.md Normal file
View File

@ -0,0 +1,22 @@
# template
[![Build Status](https://img.shields.io/travis/amphp/template/master.svg?style=flat-square)](https://travis-ci.org/amphp/template)
[![CoverageStatus](https://img.shields.io/coveralls/amphp/template/master.svg?style=flat-square)](https://coveralls.io/github/amphp/template?branch=master)
![License](https://img.shields.io/badge/license-MIT-blue.svg?style=flat-square)
`amphp/template` provides a template for AMPHP repos.
## Installation
```bash
composer require amphp/template
```
## Example
```php
<?php
require 'vendor/autoload.php';
```

38
appveyor.yml Normal file
View File

@ -0,0 +1,38 @@
build: false
shallow_clone: false
platform:
- x86
- x64
clone_folder: c:\projects\amphp
cache:
- c:\tools\php74 -> appveyor.yml
init:
- SET PATH=C:\Program Files\OpenSSL;c:\tools\php74;%PATH%
- SET COMPOSER_NO_INTERACTION=1
- SET PHP=1
- SET ANSICON=121x90 (121x90)
install:
- IF EXIST c:\tools\php74 (SET PHP=0)
- IF %PHP%==1 sc config wuauserv start= auto
- IF %PHP%==1 net start wuauserv
- IF %PHP%==1 cinst -y OpenSSL.Light
- IF %PHP%==1 cinst -y php
- cd c:\tools\php74
- IF %PHP%==1 copy php.ini-production php.ini /Y
- IF %PHP%==1 echo date.timezone="UTC" >> php.ini
- IF %PHP%==1 echo extension_dir=ext >> php.ini
- IF %PHP%==1 echo extension=php_openssl.dll >> php.ini
- IF %PHP%==1 echo extension=php_mbstring.dll >> php.ini
- IF %PHP%==1 echo extension=php_fileinfo.dll >> php.ini
- cd c:\projects\amphp
- appveyor DownloadFile https://getcomposer.org/composer.phar
- php composer.phar install --prefer-dist --no-progress
test_script:
- cd c:\projects\amphp
- vendor/bin/phpunit --colors=always

59
composer.json Normal file
View File

@ -0,0 +1,59 @@
{
"name": "amphp/ipc",
"description": "IPC component for Amp.",
"keywords": [
"asynchronous",
"async",
"concurrent",
"multi-threading",
"multi-processing"
],
"homepage": "https://github.com/amphp/ipc",
"license": "MIT",
"authors": [
{
"name": "Aaron Piotrowski",
"email": "aaron@trowski.com"
},
{
"name": "Daniil Gentili",
"email": "daniil@daniil.it"
},
{
"name": "Stephen Coakley",
"email": "me@stephencoakley.com"
}
],
"require": {
"php": ">=7.1",
"amphp/byte-stream": "^1.7"
},
"require-dev": {
"amphp/amp": "^2.4",
"phpunit/phpunit": "^8 || ^7",
"amphp/phpunit-util": "^1.1",
"amphp/php-cs-fixer-config": "dev-master"
},
"autoload": {
"psr-4": {
"Amp\\Ipc\\": "lib"
},
"files": [
"lib/functions.php"
]
},
"autoload-dev": {
"psr-4": {
"Amp\\Ipc\\Test\\": "test"
}
},
"scripts": {
"check": [
"@cs",
"@test"
],
"cs": "php-cs-fixer fix -v --diff --dry-run",
"cs-fix": "php-cs-fixer fix -v --diff",
"test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit --coverage-text"
}
}

25
examples/client.php Normal file
View File

@ -0,0 +1,25 @@
<?php
require 'vendor/autoload.php';
use Amp\Loop;
use Amp\Ipc\Sync\ChannelledSocket;
use function Amp\asyncCall;
use function Amp\Ipc\connect;
Loop::run(static function () {
$clientHandler = function (ChannelledSocket $socket) {
echo "Created connection.".PHP_EOL;
while ($payload = yield $socket->receive()) {
echo "Received $payload".PHP_EOL;
$socket->close();
}
echo "Closed connection".PHP_EOL;
};
$channel = yield connect(sys_get_temp_dir().'/test');
asyncCall($clientHandler, $channel);
yield $channel->send('ping');
});

28
examples/server.php Normal file
View File

@ -0,0 +1,28 @@
<?php
require 'vendor/autoload.php';
use Amp\Ipc\IpcServer;
use Amp\Loop;
use Amp\Ipc\Sync\ChannelledSocket;
use function Amp\asyncCall;
Loop::run(static function () {
$clientHandler = function (ChannelledSocket $socket) {
echo "Accepted connection".PHP_EOL;
while ($payload = yield $socket->receive()) {
echo "Received $payload".PHP_EOL;
if ($payload === 'ping') {
yield $socket->send('pong');
}
}
echo "Closed connection".PHP_EOL;
};
$server = new IpcServer(sys_get_temp_dir().'/test');
while ($socket = yield $server->accept()) {
asyncCall($clientHandler, $socket);
}
});

216
lib/IpcServer.php Normal file
View File

@ -0,0 +1,216 @@
<?php
namespace Amp\Ipc;
use Amp\Deferred;
use Amp\Loop;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Promise;
use Amp\Success;
class IpcServer
{
/** @var resource|null */
private $server;
/** @var Deferred */
private $acceptor;
/** @var string|null */
private $watcher;
/** @var string|null */
private $uri;
/**
* @param string $uri Local endpoint on which to listen for requests
* @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes)
*/
public function __construct(string $uri = '', bool $useFIFO = false)
{
if (!$uri) {
$suffix = \bin2hex(\random_bytes(10));
$uri = \sys_get_temp_dir()."/amp-ipc-".$suffix.".sock";
}
if (\file_exists($uri)) {
@\unlink($uri);
}
$this->uri = $uri;
$isWindows = \strncasecmp(\PHP_OS, "WIN", 3) === 0;
if ($isWindows) {
if ($useFIFO) {
throw new \RuntimeException("Cannot use FIFOs on windows");
}
$listenUri = "tcp://127.0.0.1:0";
} else {
$listenUri = "unix://".$uri;
}
if (!$useFIFO) {
$this->server = \stream_socket_server($listenUri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN);
}
$fifo = false;
if (!$this->server) {
if ($isWindows) {
throw new \RuntimeException(\sprintf("Could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
if (!\posix_mkfifo($uri, 0777)) {
throw new \RuntimeException(\sprintf("Could not create the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
if (!$this->server = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
throw new \RuntimeException(\sprintf("Could not connect to the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
\stream_set_blocking($this->server, false);
$fifo = true;
}
if ($isWindows) {
$name = \stream_socket_get_name($this->server, false);
$port = \substr($name, \strrpos($name, ":") + 1);
\file_put_contents($this->uri, "tcp://127.0.0.1:".$port);
}
$acceptor = &$this->acceptor;
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor, $fifo): void {
if ($fifo) {
$length = \unpack('v', \fread($server, 2))[1];
if (!$length) {
return; // Could not accept, wrong length read
}
$prefix = \fread($server, $length);
$sockets = [
$prefix.'1',
$prefix.'2',
];
foreach ($sockets as $k => &$socket) {
if (@\filetype($socket) !== 'fifo') {
if ($k) {
\fclose($sockets[0]);
}
return; // Is not a FIFO
}
// Open in either read or write mode to send a close signal when done
if (!$socket = \fopen($socket, $k ? 'w' : 'r')) {
if ($k) {
\fclose($sockets[0]);
}
return; // Could not open fifo
}
}
$channel = new ChannelledSocket(...$sockets);
} else {
// Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure.
if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking.
return; // Accepting client failed.
}
$channel = new ChannelledSocket($client, $client);
}
$deferred = $acceptor;
$acceptor = null;
\assert($deferred !== null);
$deferred->resolve($channel);
if (!$acceptor) {
Loop::disable($watcher);
}
});
Loop::disable($this->watcher);
}
public function __destruct()
{
$this->close();
}
/**
* @return Promise<ChanneledSocket|null>
*
* @throws PendingAcceptError If another accept request is pending.
*/
public function accept(): Promise
{
if ($this->acceptor) {
throw new PendingAcceptError;
}
if (!$this->server) {
return new Success(); // Resolve with null when server is closed.
}
$this->acceptor = new Deferred;
Loop::enable($this->watcher);
return $this->acceptor->promise();
}
/**
* Closes the server and stops accepting connections. Any socket clients accepted will not be closed.
*/
public function close(): void
{
Loop::cancel($this->watcher);
if ($this->acceptor) {
$this->acceptor->resolve();
$this->acceptor = null;
}
if ($this->server) {
\fclose($this->server);
$this->server = null;
}
if ($this->uri !== null) {
@\unlink($this->uri);
$this->uri = null;
}
}
/**
* @return bool
*/
public function isClosed(): bool
{
return $this->server === null;
}
/**
* References the accept watcher.
*
* @see Loop::reference()
*/
final public function reference(): void
{
Loop::reference($this->watcher);
}
/**
* Unreferences the accept watcher.
*
* @see Loop::unreference()
*/
final public function unreference(): void
{
Loop::unreference($this->watcher);
}
/**
* Get endpoint to which clients should connect.
*
* @return string
*/
public function getUri(): string
{
return $this->uri;
}
}

View File

@ -0,0 +1,17 @@
<?php
namespace Amp\Ipc;
/**
* Thrown in case a second read operation is attempted while another read operation is still pending.
*/
final class PendingAcceptError extends \Error
{
public function __construct(
string $message = 'The previous accept operation must complete before accept can be called again',
int $code = 0,
\Throwable $previous = null
) {
parent::__construct($message, $code, $previous);
}
}

36
lib/Sync/Channel.php Normal file
View File

@ -0,0 +1,36 @@
<?php
namespace Amp\Ipc\Sync;
use Amp\Promise;
/**
* Interface for sending messages between execution contexts.
*/
interface Channel
{
/**
* @return \Amp\Promise<mixed>
*
* @throws \Amp\Ipc\Context\StatusError Thrown if the context has not been started.
* @throws \Amp\Ipc\Sync\SynchronizationError If the context has not been started or the context
* unexpectedly ends.
* @throws \Amp\Ipc\Sync\ChannelException If receiving from the channel fails.
* @throws \Amp\Ipc\Sync\SerializationException If unserializing the data fails.
*/
public function receive(): Promise;
/**
* @param mixed $data
*
* @return \Amp\Promise<int> Resolves with the number of bytes sent on the channel.
*
* @throws \Amp\Ipc\Context\StatusError Thrown if the context has not been started.
* @throws \Amp\Ipc\Sync\SynchronizationError If the context has not been started or the context
* unexpectedly ends.
* @throws \Amp\Ipc\Sync\ChannelException If sending on the channel fails.
* @throws \Error If an ExitResult object is given.
* @throws \Amp\Ipc\Sync\SerializationException If serializing the data fails.
*/
public function send($data): Promise;
}

View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Sync;
class ChannelException extends \Exception
{
}

View File

@ -0,0 +1,88 @@
<?php
namespace Amp\Ipc\Sync;
use Amp\Parser\Parser;
final class ChannelParser extends Parser
{
const HEADER_LENGTH = 5;
/**
* @param callable(mixed $data) Callback invoked when data is parsed.
*/
public function __construct(callable $callback)
{
parent::__construct(self::parser($callback));
}
/**
* @param mixed $data Data to encode to send over a channel.
*
* @return string Encoded data that can be parsed by this class.
*
* @throws \Amp\Ipc\Sync\SerializationException
*/
public function encode($data): string
{
try {
$data = \serialize($data);
} catch (\Throwable $exception) {
throw new SerializationException(
"The given data cannot be sent because it is not serializable.",
0,
$exception
);
}
return \pack("CL", 0, \strlen($data)) . $data;
}
/**
* @param callable $push
*
* @return \Generator
*
* @throws \Amp\Ipc\Sync\ChannelException
* @throws \Amp\Ipc\Sync\SerializationException
*/
private static function parser(callable $push): \Generator
{
while (true) {
$header = yield self::HEADER_LENGTH;
$data = \unpack("Cprefix/Llength", $header);
if ($data["prefix"] !== 0) {
$data = $header . yield;
throw new ChannelException("Invalid packet received: " . self::encodeUnprintableChars($data));
}
$data = yield $data["length"];
// Attempt to unserialize the received data.
try {
$result = \unserialize($data);
if ($result === false && $data !== \serialize(false)) {
throw new ChannelException("Received invalid data: " . self::encodeUnprintableChars($data));
}
} catch (\Throwable $exception) {
throw new SerializationException("Exception thrown when unserializing data", 0, $exception);
}
$push($result);
}
}
/**
* @param string $data Binary data.
*
* @return string Unprintable characters encoded as \x##.
*/
private static function encodeUnprintableChars(string $data): string
{
return \preg_replace_callback("/[^\x20-\x7e]/", function (array $matches) {
return "\\x" . \dechex(\ord($matches[0]));
}, $data);
}
}

View File

@ -0,0 +1,68 @@
<?php
namespace Amp\Ipc\Sync;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Promise;
final class ChannelledSocket implements Channel
{
/** @var ChannelledStream */
private $channel;
/** @var ResourceInputStream */
private $read;
/** @var ResourceOutputStream */
private $write;
/**
* @param resource $read Readable stream resource.
* @param resource $write Writable stream resource.
*
* @throws \Error If a stream resource is not given for $resource.
*/
public function __construct($read, $write)
{
$this->channel = new ChannelledStream(
$this->read = new ResourceInputStream($read),
$this->write = new ResourceOutputStream($write)
);
}
/**
* {@inheritdoc}
*/
public function receive(): Promise
{
return $this->channel->receive();
}
/**
* {@inheritdoc}
*/
public function send($data): Promise
{
return $this->channel->send($data);
}
public function unreference()
{
$this->read->unreference();
}
public function reference()
{
$this->read->reference();
}
/**
* Closes the read and write resource streams.
*/
public function close()
{
$this->read->close();
$this->write->close();
}
}

View File

@ -0,0 +1,81 @@
<?php
namespace Amp\Ipc\Sync;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\StreamException;
use Amp\Promise;
use function Amp\call;
/**
* An asynchronous channel for sending data between threads and processes.
*
* Supports full duplex read and write.
*/
final class ChannelledStream implements Channel
{
/** @var \Amp\ByteStream\InputStream */
private $read;
/** @var \Amp\ByteStream\OutputStream */
private $write;
/** @var \SplQueue */
private $received;
/** @var \Amp\Parser\Parser */
private $parser;
/**
* Creates a new channel from the given stream objects. Note that $read and $write can be the same object.
*
* @param \Amp\ByteStream\InputStream $read
* @param \Amp\ByteStream\OutputStream $write
*/
public function __construct(InputStream $read, OutputStream $write)
{
$this->read = $read;
$this->write = $write;
$this->received = new \SplQueue;
$this->parser = new ChannelParser([$this->received, 'push']);
}
/**
* {@inheritdoc}
*/
public function send($data): Promise
{
return call(function () use ($data) {
try {
return yield $this->write->write($this->parser->encode($data));
} catch (StreamException $exception) {
throw new ChannelException("Sending on the channel failed. Did the context die?", 0, $exception);
}
});
}
/**
* {@inheritdoc}
*/
public function receive(): Promise
{
return call(function () {
while ($this->received->isEmpty()) {
try {
$chunk = yield $this->read->read();
} catch (StreamException $exception) {
throw new ChannelException("Reading from the channel failed. Did the context die?", 0, $exception);
}
if ($chunk === null) {
return null;
}
$this->parser->push($chunk);
}
return $this->received->shift();
});
}
}

60
lib/Sync/ExitFailure.php Normal file
View File

@ -0,0 +1,60 @@
<?php
namespace Amp\Ipc\Sync;
final class ExitFailure implements ExitResult
{
/** @var string */
private $type;
/** @var string */
private $message;
/** @var int|string */
private $code;
/** @var array */
private $trace;
/** @var self|null */
private $previous;
public function __construct(\Throwable $exception)
{
$this->type = \get_class($exception);
$this->message = $exception->getMessage();
$this->code = $exception->getCode();
$this->trace = $exception->getTraceAsString();
if ($previous = $exception->getPrevious()) {
$this->previous = new self($previous);
}
}
/**
* {@inheritdoc}
*/
public function getResult()
{
throw $this->createException();
}
private function createException(): PanicError
{
$previous = $this->previous ? $this->previous->createException() : null;
return new PanicError(
$this->type,
\sprintf(
'Uncaught %s in worker with message "%s" and code "%s"; use %s::getPanicTrace() '
. 'for the stack trace in the context',
$this->type,
$this->message,
$this->code,
PanicError::class
),
$this->trace,
$previous
);
}
}

13
lib/Sync/ExitResult.php Normal file
View File

@ -0,0 +1,13 @@
<?php
namespace Amp\Ipc\Sync;
interface ExitResult
{
/**
* @return mixed Return value of the callable given to the execution context.
*
* @throws \Amp\Ipc\Sync\PanicError If the context exited with an uncaught exception.
*/
public function getResult();
}

22
lib/Sync/ExitSuccess.php Normal file
View File

@ -0,0 +1,22 @@
<?php
namespace Amp\Ipc\Sync;
final class ExitSuccess implements ExitResult
{
/** @var mixed */
private $result;
public function __construct($result)
{
$this->result = $result;
}
/**
* {@inheritdoc}
*/
public function getResult()
{
return $this->result;
}
}

View File

@ -0,0 +1,33 @@
<?php
namespace Amp\Ipc\Sync\Internal;
final class ParcelStorage extends \Threaded
{
/** @var mixed */
private $value;
/**
* @param mixed $value
*/
public function __construct($value)
{
$this->value = $value;
}
/**
* @return mixed
*/
public function get()
{
return $this->value;
}
/**
* @param mixed $value
*/
public function set($value)
{
$this->value = $value;
}
}

48
lib/Sync/PanicError.php Normal file
View File

@ -0,0 +1,48 @@
<?php
namespace Amp\Ipc\Sync;
final class PanicError extends \Error
{
/** @var string Class name of uncaught exception. */
private $name;
/** @var string Stack trace of the panic. */
private $trace;
/**
* Creates a new panic error.
*
* @param string $name The uncaught exception class.
* @param string $message The panic message.
* @param string $trace The panic stack trace.
* @param \Throwable|null $previous Previous exception.
*/
public function __construct(string $name, string $message = '', string $trace = '', \Throwable $previous = null)
{
parent::__construct($message, 0, $previous);
$this->name = $name;
$this->trace = $trace;
}
/**
* Returns the class name of the uncaught exception.
*
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* Gets the stack trace at the point the panic occurred.
*
* @return string
*/
public function getPanicTrace(): string
{
return $this->trace;
}
}

38
lib/Sync/Parcel.php Normal file
View File

@ -0,0 +1,38 @@
<?php
namespace Amp\Ipc\Sync;
use Amp\Promise;
/**
* A parcel object for sharing data across execution contexts.
*
* A parcel is an object that stores a value in a safe way that can be shared
* between different threads or processes. Different handles to the same parcel
* will access the same data, and a parcel handle itself is serializable and
* can be transported to other execution contexts.
*
* Wrapping and unwrapping values in the parcel are not atomic. To prevent race
* conditions and guarantee safety, you should use the provided synchronization
* methods to acquire a lock for exclusive access to the parcel first before
* accessing the contained value.
*/
interface Parcel
{
/**
* Asynchronously invokes a callback while maintaining an exclusive lock on the parcel. The current value of the
* parcel is provided as the first argument to the callback function.
*
* @param callable $callback The synchronized callback to invoke. The parcel value is given as the single argument
* to the callback function. The callback may be a regular function or a coroutine.
*
* @return \Amp\Promise<mixed> Resolves with the return value of $callback or fails if $callback
* throws an exception.
*/
public function synchronized(callable $callback): Promise;
/**
* @return \Amp\Promise<mixed> A promise for the value inside the parcel.
*/
public function unwrap(): Promise;
}

View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Sync;
class SerializationException extends \Exception
{
}

View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Sync;
class SharedMemoryException extends \Exception
{
}

View File

@ -0,0 +1,385 @@
<?php
namespace Amp\Ipc\Sync;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;
use Amp\Sync\PosixSemaphore;
use function Amp\call;
/**
* A container object for sharing a value across contexts.
*
* A shared object is a container that stores an object inside shared memory.
* The object can be accessed and mutated by any thread or process. The shared
* object handle itself is serializable and can be sent to any thread or process
* to give access to the value that is shared in the container.
*
* Because each shared object uses its own shared memory segment, it is much
* more efficient to store a larger object containing many values inside a
* single shared container than to use many small shared containers.
*
* Note that accessing a shared object is not atomic. Access to a shared object
* should be protected with a mutex to preserve data integrity.
*
* When used with forking, the object must be created prior to forking for both
* processes to access the synchronized object.
*
* @see http://php.net/manual/en/book.shmop.php The shared memory extension.
* @see http://man7.org/linux/man-pages/man2/shmctl.2.html How shared memory works on Linux.
* @see https://msdn.microsoft.com/en-us/library/ms810613.aspx How shared memory works on Windows.
*/
final class SharedMemoryParcel implements Parcel
{
/** @var int The byte offset to the start of the object data in memory. */
const MEM_DATA_OFFSET = 7;
// A list of valid states the object can be in.
const STATE_UNALLOCATED = 0;
const STATE_ALLOCATED = 1;
const STATE_MOVED = 2;
const STATE_FREED = 3;
/** @var string */
private $id;
/** @var int The shared memory segment key. */
private $key;
/** @var PosixSemaphore A semaphore for synchronizing on the parcel. */
private $semaphore;
/** @var int An open handle to the shared memory segment. */
private $handle;
/** @var int */
private $initializer = 0;
/**
* @param string $id
* @param mixed $value
* @param int $size The initial size in bytes of the shared memory segment. It will automatically be expanded as
* necessary.
* @param int $permissions Permissions to access the semaphore. Use file permission format specified as 0xxx.
*
* @return \Amp\Ipc\Sync\SharedMemoryParcel
*/
public static function create(string $id, $value, int $size = 8192, int $permissions = 0600): self
{
$parcel = new self($id);
$parcel->init($value, $size, $permissions);
return $parcel;
}
/**
* @param string $id
*
* @return \Amp\Ipc\Sync\SharedMemoryParcel
*/
public static function use(string $id): self
{
$parcel = new self($id);
$parcel->open();
return $parcel;
}
/**
* Creates a new local object container.
*
* The object given will be assigned a new object ID and will have a
* reference to it stored in memory local to the thread.
*
* @param mixed $value The value to store in the container.
* @param int $size The number of bytes to allocate for the object.
* If not specified defaults to 16384 bytes.
* @param int $permissions The access permissions to set for the object.
* If not specified defaults to 0600.
*/
private function __construct(string $id)
{
if (!\extension_loaded("shmop")) {
throw new \Error(__CLASS__ . " requires the shmop extension.");
}
$this->id = $id;
$this->key = self::makeKey($this->id);
}
/**
* @param mixed $value
* @param int $size
* @param int $permissions
*/
private function init($value, int $size = 8192, int $permissions = 0600)
{
$this->semaphore = PosixSemaphore::create($this->id, 1);
$this->initializer = \getmypid();
$this->memOpen($this->key, 'n', $permissions, $size + self::MEM_DATA_OFFSET);
$this->setHeader(self::STATE_ALLOCATED, 0, $permissions);
$this->wrap($value);
}
private function open()
{
$this->semaphore = PosixSemaphore::use($this->id);
$this->memOpen($this->key, 'w', 0, 0);
}
/**
* Checks if the object has been freed.
*
* Note that this does not check if the object has been destroyed; it only
* checks if this handle has freed its reference to the object.
*
* @return bool True if the object is freed, otherwise false.
*/
private function isFreed(): bool
{
// If we are no longer connected to the memory segment, check if it has
// been invalidated.
if ($this->handle !== null) {
$this->handleMovedMemory();
$header = $this->getHeader();
return $header['state'] === static::STATE_FREED;
}
return true;
}
/**
* {@inheritdoc}
*/
public function unwrap(): Promise
{
if ($this->isFreed()) {
return new Failure(new SharedMemoryException('The object has already been freed.'));
}
$header = $this->getHeader();
// Make sure the header is in a valid state and format.
if ($header['state'] !== self::STATE_ALLOCATED || $header['size'] <= 0) {
new Failure(new SharedMemoryException('Shared object memory is corrupt.'));
}
// Read the actual value data from memory and unserialize it.
$data = $this->memGet(self::MEM_DATA_OFFSET, $header['size']);
return new Success(\unserialize($data));
}
/**
* If the value requires more memory to store than currently allocated, a
* new shared memory segment will be allocated with a larger size to store
* the value in. The previous memory segment will be cleaned up and marked
* for deletion. Other processes and threads will be notified of the new
* memory segment on the next read attempt. Once all running processes and
* threads disconnect from the old segment, it will be freed by the OS.
*/
protected function wrap($value)
{
if ($this->isFreed()) {
throw new SharedMemoryException('The object has already been freed.');
}
$serialized = \serialize($value);
$size = \strlen($serialized);
$header = $this->getHeader();
/* If we run out of space, we need to allocate a new shared memory
segment that is larger than the current one. To coordinate with other
processes, we will leave a message in the old segment that the segment
has moved and along with the new key. The old segment will be discarded
automatically after all other processes notice the change and close
the old handle.
*/
if (\shmop_size($this->handle) < $size + self::MEM_DATA_OFFSET) {
$this->key = $this->key < 0xffffffff ? $this->key + 1 : \mt_rand(0x10, 0xfffffffe);
$this->setHeader(self::STATE_MOVED, $this->key, 0);
$this->memDelete();
\shmop_close($this->handle);
$this->memOpen($this->key, 'n', $header['permissions'], $size * 2);
}
// Rewrite the header and the serialized value to memory.
$this->setHeader(self::STATE_ALLOCATED, $size, $header['permissions']);
$this->memSet(self::MEM_DATA_OFFSET, $serialized);
}
/**
* {@inheritdoc}
*/
public function synchronized(callable $callback): Promise
{
return call(function () use ($callback) {
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->semaphore->acquire();
try {
$result = yield call($callback, yield $this->unwrap());
if ($result !== null) {
$this->wrap($result);
}
} finally {
$lock->release();
}
return $result;
});
}
/**
* Frees the shared object from memory.
*
* The memory containing the shared value will be invalidated. When all
* process disconnect from the object, the shared memory block will be
* destroyed by the OS.
*/
public function __destruct()
{
if ($this->initializer === 0 || $this->initializer !== \getmypid()) {
return;
}
if ($this->isFreed()) {
return;
}
// Invalidate the memory block by setting its state to FREED.
$this->setHeader(static::STATE_FREED, 0, 0);
// Request the block to be deleted, then close our local handle.
$this->memDelete();
\shmop_close($this->handle);
$this->handle = null;
$this->semaphore = null;
}
/**
* Private method to prevent cloning.
*/
private function __clone()
{
}
/**
* Private method to prevent serialization.
*/
private function __sleep()
{
}
/**
* Updates the current memory segment handle, handling any moves made on the
* data.
*/
private function handleMovedMemory()
{
// Read from the memory block and handle moved blocks until we find the
// correct block.
while (true) {
$header = $this->getHeader();
// If the state is STATE_MOVED, the memory is stale and has been moved
// to a new location. Move handle and try to read again.
if ($header['state'] !== self::STATE_MOVED) {
break;
}
\shmop_close($this->handle);
$this->key = $header['size'];
$this->memOpen($this->key, 'w', 0, 0);
}
}
/**
* Reads and returns the data header at the current memory segment.
*
* @return array An associative array of header data.
*/
private function getHeader(): array
{
$data = $this->memGet(0, self::MEM_DATA_OFFSET);
return \unpack('Cstate/Lsize/Spermissions', $data);
}
/**
* Sets the header data for the current memory segment.
*
* @param int $state An object state.
* @param int $size The size of the stored data, or other value.
* @param int $permissions The permissions mask on the memory segment.
*/
private function setHeader(int $state, int $size, int $permissions)
{
$header = \pack('CLS', $state, $size, $permissions);
$this->memSet(0, $header);
}
/**
* Opens a shared memory handle.
*
* @param int $key The shared memory key.
* @param string $mode The mode to open the shared memory in.
* @param int $permissions Process permissions on the shared memory.
* @param int $size The size to crate the shared memory in bytes.
*/
private function memOpen(int $key, string $mode, int $permissions, int $size)
{
$this->handle = @\shmop_open($key, $mode, $permissions, $size);
if ($this->handle === false) {
throw new SharedMemoryException('Failed to create shared memory block.');
}
}
/**
* Reads binary data from shared memory.
*
* @param int $offset The offset to read from.
* @param int $size The number of bytes to read.
*
* @return string The binary data at the given offset.
*/
private function memGet(int $offset, int $size): string
{
$data = \shmop_read($this->handle, $offset, $size);
if ($data === false) {
throw new SharedMemoryException('Failed to read from shared memory block.');
}
return $data;
}
/**
* Writes binary data to shared memory.
*
* @param int $offset The offset to write to.
* @param string $data The binary data to write.
*/
private function memSet(int $offset, string $data)
{
if (!\shmop_write($this->handle, $data, $offset)) {
throw new SharedMemoryException('Failed to write to shared memory block.');
}
}
/**
* Requests the shared memory segment to be deleted.
*/
private function memDelete()
{
if (!\shmop_delete($this->handle)) {
throw new SharedMemoryException('Failed to discard shared memory block.');
}
}
private static function makeKey(string $id): int
{
return \abs(\unpack("l", \md5($id, true))[1]);
}
}

View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Sync;
class SynchronizationError extends \Error
{
}

View File

@ -0,0 +1,62 @@
<?php
namespace Amp\Ipc\Sync;
use Amp\Promise;
use Amp\Success;
use Amp\Sync\ThreadedMutex;
use function Amp\call;
/**
* A thread-safe container that shares a value between multiple threads.
*/
final class ThreadedParcel implements Parcel
{
/** @var \Amp\Sync\ThreadedMutex */
private $mutex;
/** @var \Threaded */
private $storage;
/**
* Creates a new shared object container.
*
* @param mixed $value The value to store in the container.
*/
public function __construct($value)
{
$this->mutex = new ThreadedMutex;
$this->storage = new Internal\ParcelStorage($value);
}
/**
* {@inheritdoc}
*/
public function unwrap(): Promise
{
return new Success($this->storage->get());
}
/**
* @return \Amp\Promise
*/
public function synchronized(callable $callback): Promise
{
return call(function () use ($callback) {
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->mutex->acquire();
try {
$result = yield call($callback, $this->storage->get());
if ($result !== null) {
$this->storage->set($result);
}
} finally {
$lock->release();
}
return $result;
});
}
}

78
lib/functions.php Normal file
View File

@ -0,0 +1,78 @@
<?php
namespace Amp\Ipc;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Promise;
use function Amp\call;
/**
* Connect to IPC server.
*
* @param string $uri URI
*
* @return Promise<ChannelledSocket>
*/
function connect(string $uri): Promise
{
return call(static function () use ($uri) {
$type = \filetype($uri);
if ($type === 'fifo') {
$suffix = \bin2hex(\random_bytes(10));
$prefix = \sys_get_temp_dir()."/amp-".$suffix.".fifo";
if (\strlen($prefix) > 0xFFFF) {
\trigger_error("Prefix is too long!", E_USER_ERROR);
exit(1);
}
$sockets = [
$prefix."2",
$prefix."1",
];
foreach ($sockets as $k => &$socket) {
if (!\posix_mkfifo($socket, 0777)) {
\trigger_error("Could not create FIFO client socket", E_USER_ERROR);
exit(1);
}
\register_shutdown_function(static function () use ($socket): void {
@\unlink($socket);
});
if (!$socket = \fopen($socket, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
\trigger_error("Could not open FIFO client socket", E_USER_ERROR);
exit(1);
}
}
if (!$tempSocket = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
\trigger_error("Could not connect to FIFO server", E_USER_ERROR);
exit(1);
}
\stream_set_blocking($tempSocket, false);
\stream_set_write_buffer($tempSocket, 0);
if (!\fwrite($tempSocket, \pack('v', \strlen($prefix)).$prefix)) {
\trigger_error("Failure sending request to FIFO server", E_USER_ERROR);
exit(1);
}
\fclose($tempSocket);
$tempSocket = null;
return new ChannelledSocket(...$sockets);
}
if ($type === 'file') {
$uri = \file_get_contents($uri);
} else {
$uri = "unix://$uri";
}
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
\trigger_error("Could not connect to IPC socket", E_USER_ERROR);
exit(1);
}
return new ChannelledSocket($socket, $socket);
});
}

28
phpunit.xml.dist Normal file
View File

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/6.0/phpunit.xsd"
backupGlobals="false"
backupStaticAttributes="false"
bootstrap="vendor/autoload.php"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
>
<testsuites>
<testsuite name="Main">
<directory>test</directory>
</testsuite>
</testsuites>
<filter>
<whitelist>
<directory suffix=".php">lib</directory>
</whitelist>
</filter>
<listeners>
<listener class="Amp\PHPUnit\LoopReset"/>
</listeners>
</phpunit>