1
0
mirror of https://github.com/danog/ipc.git synced 2024-11-30 04:29:09 +01:00

Add tests

This commit is contained in:
Daniil Gentili 2020-03-05 21:12:35 +01:00
parent 219b05c9da
commit a87af0e82b
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
20 changed files with 301 additions and 651 deletions

View File

@ -4,19 +4,76 @@
[![CoverageStatus](https://img.shields.io/coveralls/amphp/template/master.svg?style=flat-square)](https://coveralls.io/github/amphp/template?branch=master) [![CoverageStatus](https://img.shields.io/coveralls/amphp/template/master.svg?style=flat-square)](https://coveralls.io/github/amphp/template?branch=master)
![License](https://img.shields.io/badge/license-MIT-blue.svg?style=flat-square) ![License](https://img.shields.io/badge/license-MIT-blue.svg?style=flat-square)
`amphp/template` provides a template for AMPHP repos. `amphp/ipc` provides an async IPC server.
## Installation ## Installation
```bash ```bash
composer require amphp/template composer require amphp/ipc
``` ```
## Example ## Example
Server:
```php ```php
<?php <?php
require 'vendor/autoload.php'; require 'vendor/autoload.php';
use Amp\Ipc\IpcServer;
use Amp\Loop;
use Amp\Ipc\Sync\ChannelledSocket;
use function Amp\asyncCall;
Loop::run(static function () {
$clientHandler = function (ChannelledSocket $socket) {
echo "Accepted connection".PHP_EOL;
while ($payload = yield $socket->receive()) {
echo "Received $payload".PHP_EOL;
if ($payload === 'ping') {
yield $socket->send('pong');
}
}
yield $socket->disconnect();
echo "Closed connection".PHP_EOL;
};
$server = new IpcServer(sys_get_temp_dir().'/test');
while ($socket = yield $server->accept()) {
asyncCall($clientHandler, $socket);
}
});
``` ```
Client:
```php
<?php
require 'vendor/autoload.php';
use Amp\Loop;
use Amp\Ipc\Sync\ChannelledSocket;
use function Amp\asyncCall;
use function Amp\Ipc\connect;
Loop::run(static function () {
$clientHandler = function (ChannelledSocket $socket) {
echo "Created connection.".PHP_EOL;
while ($payload = yield $socket->receive()) {
echo "Received $payload".PHP_EOL;
yield $socket->disconnect();
}
echo "Closed connection".PHP_EOL;
};
$channel = yield connect(sys_get_temp_dir().'/test');
asyncCall($clientHandler, $channel);
yield $channel->send('ping');
});
```

View File

@ -26,12 +26,14 @@
], ],
"require": { "require": {
"php": ">=7.1", "php": ">=7.1",
"amphp/byte-stream": "^1.7" "amphp/byte-stream": "^1.7",
"amphp/parser": "^1.0"
}, },
"require-dev": { "require-dev": {
"amphp/amp": "^2.4", "amphp/amp": "^2.4",
"phpunit/phpunit": "^8 || ^7", "amphp/parallel": "^1.3",
"amphp/phpunit-util": "^1.1", "phpunit/phpunit": "^9",
"amphp/phpunit-util": "^1.3",
"amphp/php-cs-fixer-config": "dev-master" "amphp/php-cs-fixer-config": "dev-master"
}, },
"autoload": { "autoload": {

View File

@ -14,7 +14,7 @@ Loop::run(static function () {
while ($payload = yield $socket->receive()) { while ($payload = yield $socket->receive()) {
echo "Received $payload".PHP_EOL; echo "Received $payload".PHP_EOL;
$socket->close(); yield $socket->disconnect();
} }
echo "Closed connection".PHP_EOL; echo "Closed connection".PHP_EOL;
}; };

View File

@ -18,11 +18,12 @@ Loop::run(static function () {
yield $socket->send('pong'); yield $socket->send('pong');
} }
} }
echo "Closed connection".PHP_EOL; yield $socket->disconnect();
echo "Closed connection".PHP_EOL."==========".PHP_EOL;
}; };
$server = new IpcServer(sys_get_temp_dir().'/test'); $server = new IpcServer(sys_get_temp_dir().'/test');
while ($socket = yield $server->accept()) { while ($socket = yield $server->accept()) {
asyncCall($clientHandler, $socket); asyncCall($clientHandler, $socket);
} }
}); });

View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Sync;
final class ChannelCloseAck implements ChannelCloseMsg
{
}

View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Sync;
interface ChannelCloseMsg
{
}

View File

@ -0,0 +1,7 @@
<?php
namespace Amp\Ipc\Sync;
final class ChannelCloseReq implements ChannelCloseMsg
{
}

View File

@ -5,9 +5,19 @@ namespace Amp\Ipc\Sync;
use Amp\ByteStream\ResourceInputStream; use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream; use Amp\ByteStream\ResourceOutputStream;
use Amp\Promise; use Amp\Promise;
use Amp\Success;
use function Amp\call;
final class ChannelledSocket implements Channel final class ChannelledSocket implements Channel
{ {
private const ESTABLISHED = 0;
private const GOT_FIN_MASK = 1;
private const GOT_ACK_MASK = 2;
private const GOT_ALL_MASK = 3;
/** @var ChannelledStream */ /** @var ChannelledStream */
private $channel; private $channel;
@ -17,6 +27,9 @@ final class ChannelledSocket implements Channel
/** @var ResourceOutputStream */ /** @var ResourceOutputStream */
private $write; private $write;
/** @var int */
private $state = self::ESTABLISHED;
/** /**
* @param resource $read Readable stream resource. * @param resource $read Readable stream resource.
* @param resource $write Writable stream resource. * @param resource $write Writable stream resource.
@ -36,7 +49,51 @@ final class ChannelledSocket implements Channel
*/ */
public function receive(): Promise public function receive(): Promise
{ {
return $this->channel->receive(); if (!$this->channel) {
return new Success();
}
return call(function (): \Generator {
$data = yield $this->channel->receive();
if ($data instanceof ChannelCloseReq) {
yield $this->channel->send(new ChannelCloseAck);
$this->state = self::GOT_FIN_MASK;
return null;
}
return $data;
});
}
/**
* Cleanly disconnect from other endpoint.
*
* @return Promise
*/
public function disconnect(): Promise
{
if (!$this->channel) {
throw new ChannelException('The channel was already closed!');
}
$channel = $this->channel;
$this->channel = null;
return call(function () use ($channel): \Generator {
yield $channel->send(new ChannelCloseReq);
do {
$data = yield $channel->receive();
if ($data instanceof ChannelCloseReq) {
yield $channel->send(new ChannelCloseAck);
$this->state |= self::GOT_FIN_MASK;
} else if ($data instanceof ChannelCloseAck) {
$this->state |= self::GOT_ACK_MASK;
}
} while ($this->state !== self::GOT_ALL_MASK);
$this->close();
});
} }
/** /**
@ -44,14 +101,23 @@ final class ChannelledSocket implements Channel
*/ */
public function send($data): Promise public function send($data): Promise
{ {
if (!$this->channel) {
throw new ChannelException('The channel was already closed!');
}
return $this->channel->send($data); return $this->channel->send($data);
} }
/**
* {@inheritdoc}
*/
public function unreference() public function unreference()
{ {
$this->read->unreference(); $this->read->unreference();
} }
/**
* {@inheritdoc}
*/
public function reference() public function reference()
{ {
$this->read->reference(); $this->read->reference();
@ -60,7 +126,7 @@ final class ChannelledSocket implements Channel
/** /**
* Closes the read and write resource streams. * Closes the read and write resource streams.
*/ */
public function close() private function close()
{ {
$this->read->close(); $this->read->close();
$this->write->close(); $this->write->close();

View File

@ -69,7 +69,7 @@ final class ChannelledStream implements Channel
} }
if ($chunk === null) { if ($chunk === null) {
return null; throw new ChannelException("The channel closed unexpectedly. Did the context die?");
} }
$this->parser->push($chunk); $this->parser->push($chunk);

View File

@ -1,60 +0,0 @@
<?php
namespace Amp\Ipc\Sync;
final class ExitFailure implements ExitResult
{
/** @var string */
private $type;
/** @var string */
private $message;
/** @var int|string */
private $code;
/** @var array */
private $trace;
/** @var self|null */
private $previous;
public function __construct(\Throwable $exception)
{
$this->type = \get_class($exception);
$this->message = $exception->getMessage();
$this->code = $exception->getCode();
$this->trace = $exception->getTraceAsString();
if ($previous = $exception->getPrevious()) {
$this->previous = new self($previous);
}
}
/**
* {@inheritdoc}
*/
public function getResult()
{
throw $this->createException();
}
private function createException(): PanicError
{
$previous = $this->previous ? $this->previous->createException() : null;
return new PanicError(
$this->type,
\sprintf(
'Uncaught %s in worker with message "%s" and code "%s"; use %s::getPanicTrace() '
. 'for the stack trace in the context',
$this->type,
$this->message,
$this->code,
PanicError::class
),
$this->trace,
$previous
);
}
}

View File

@ -1,13 +0,0 @@
<?php
namespace Amp\Ipc\Sync;
interface ExitResult
{
/**
* @return mixed Return value of the callable given to the execution context.
*
* @throws \Amp\Ipc\Sync\PanicError If the context exited with an uncaught exception.
*/
public function getResult();
}

View File

@ -1,22 +0,0 @@
<?php
namespace Amp\Ipc\Sync;
final class ExitSuccess implements ExitResult
{
/** @var mixed */
private $result;
public function __construct($result)
{
$this->result = $result;
}
/**
* {@inheritdoc}
*/
public function getResult()
{
return $this->result;
}
}

View File

@ -1,33 +0,0 @@
<?php
namespace Amp\Ipc\Sync\Internal;
final class ParcelStorage extends \Threaded
{
/** @var mixed */
private $value;
/**
* @param mixed $value
*/
public function __construct($value)
{
$this->value = $value;
}
/**
* @return mixed
*/
public function get()
{
return $this->value;
}
/**
* @param mixed $value
*/
public function set($value)
{
$this->value = $value;
}
}

View File

@ -1,7 +0,0 @@
<?php
namespace Amp\Ipc\Sync;
class SharedMemoryException extends \Exception
{
}

View File

@ -1,385 +0,0 @@
<?php
namespace Amp\Ipc\Sync;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;
use Amp\Sync\PosixSemaphore;
use function Amp\call;
/**
* A container object for sharing a value across contexts.
*
* A shared object is a container that stores an object inside shared memory.
* The object can be accessed and mutated by any thread or process. The shared
* object handle itself is serializable and can be sent to any thread or process
* to give access to the value that is shared in the container.
*
* Because each shared object uses its own shared memory segment, it is much
* more efficient to store a larger object containing many values inside a
* single shared container than to use many small shared containers.
*
* Note that accessing a shared object is not atomic. Access to a shared object
* should be protected with a mutex to preserve data integrity.
*
* When used with forking, the object must be created prior to forking for both
* processes to access the synchronized object.
*
* @see http://php.net/manual/en/book.shmop.php The shared memory extension.
* @see http://man7.org/linux/man-pages/man2/shmctl.2.html How shared memory works on Linux.
* @see https://msdn.microsoft.com/en-us/library/ms810613.aspx How shared memory works on Windows.
*/
final class SharedMemoryParcel implements Parcel
{
/** @var int The byte offset to the start of the object data in memory. */
const MEM_DATA_OFFSET = 7;
// A list of valid states the object can be in.
const STATE_UNALLOCATED = 0;
const STATE_ALLOCATED = 1;
const STATE_MOVED = 2;
const STATE_FREED = 3;
/** @var string */
private $id;
/** @var int The shared memory segment key. */
private $key;
/** @var PosixSemaphore A semaphore for synchronizing on the parcel. */
private $semaphore;
/** @var int An open handle to the shared memory segment. */
private $handle;
/** @var int */
private $initializer = 0;
/**
* @param string $id
* @param mixed $value
* @param int $size The initial size in bytes of the shared memory segment. It will automatically be expanded as
* necessary.
* @param int $permissions Permissions to access the semaphore. Use file permission format specified as 0xxx.
*
* @return \Amp\Ipc\Sync\SharedMemoryParcel
*/
public static function create(string $id, $value, int $size = 8192, int $permissions = 0600): self
{
$parcel = new self($id);
$parcel->init($value, $size, $permissions);
return $parcel;
}
/**
* @param string $id
*
* @return \Amp\Ipc\Sync\SharedMemoryParcel
*/
public static function use(string $id): self
{
$parcel = new self($id);
$parcel->open();
return $parcel;
}
/**
* Creates a new local object container.
*
* The object given will be assigned a new object ID and will have a
* reference to it stored in memory local to the thread.
*
* @param mixed $value The value to store in the container.
* @param int $size The number of bytes to allocate for the object.
* If not specified defaults to 16384 bytes.
* @param int $permissions The access permissions to set for the object.
* If not specified defaults to 0600.
*/
private function __construct(string $id)
{
if (!\extension_loaded("shmop")) {
throw new \Error(__CLASS__ . " requires the shmop extension.");
}
$this->id = $id;
$this->key = self::makeKey($this->id);
}
/**
* @param mixed $value
* @param int $size
* @param int $permissions
*/
private function init($value, int $size = 8192, int $permissions = 0600)
{
$this->semaphore = PosixSemaphore::create($this->id, 1);
$this->initializer = \getmypid();
$this->memOpen($this->key, 'n', $permissions, $size + self::MEM_DATA_OFFSET);
$this->setHeader(self::STATE_ALLOCATED, 0, $permissions);
$this->wrap($value);
}
private function open()
{
$this->semaphore = PosixSemaphore::use($this->id);
$this->memOpen($this->key, 'w', 0, 0);
}
/**
* Checks if the object has been freed.
*
* Note that this does not check if the object has been destroyed; it only
* checks if this handle has freed its reference to the object.
*
* @return bool True if the object is freed, otherwise false.
*/
private function isFreed(): bool
{
// If we are no longer connected to the memory segment, check if it has
// been invalidated.
if ($this->handle !== null) {
$this->handleMovedMemory();
$header = $this->getHeader();
return $header['state'] === static::STATE_FREED;
}
return true;
}
/**
* {@inheritdoc}
*/
public function unwrap(): Promise
{
if ($this->isFreed()) {
return new Failure(new SharedMemoryException('The object has already been freed.'));
}
$header = $this->getHeader();
// Make sure the header is in a valid state and format.
if ($header['state'] !== self::STATE_ALLOCATED || $header['size'] <= 0) {
new Failure(new SharedMemoryException('Shared object memory is corrupt.'));
}
// Read the actual value data from memory and unserialize it.
$data = $this->memGet(self::MEM_DATA_OFFSET, $header['size']);
return new Success(\unserialize($data));
}
/**
* If the value requires more memory to store than currently allocated, a
* new shared memory segment will be allocated with a larger size to store
* the value in. The previous memory segment will be cleaned up and marked
* for deletion. Other processes and threads will be notified of the new
* memory segment on the next read attempt. Once all running processes and
* threads disconnect from the old segment, it will be freed by the OS.
*/
protected function wrap($value)
{
if ($this->isFreed()) {
throw new SharedMemoryException('The object has already been freed.');
}
$serialized = \serialize($value);
$size = \strlen($serialized);
$header = $this->getHeader();
/* If we run out of space, we need to allocate a new shared memory
segment that is larger than the current one. To coordinate with other
processes, we will leave a message in the old segment that the segment
has moved and along with the new key. The old segment will be discarded
automatically after all other processes notice the change and close
the old handle.
*/
if (\shmop_size($this->handle) < $size + self::MEM_DATA_OFFSET) {
$this->key = $this->key < 0xffffffff ? $this->key + 1 : \mt_rand(0x10, 0xfffffffe);
$this->setHeader(self::STATE_MOVED, $this->key, 0);
$this->memDelete();
\shmop_close($this->handle);
$this->memOpen($this->key, 'n', $header['permissions'], $size * 2);
}
// Rewrite the header and the serialized value to memory.
$this->setHeader(self::STATE_ALLOCATED, $size, $header['permissions']);
$this->memSet(self::MEM_DATA_OFFSET, $serialized);
}
/**
* {@inheritdoc}
*/
public function synchronized(callable $callback): Promise
{
return call(function () use ($callback) {
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->semaphore->acquire();
try {
$result = yield call($callback, yield $this->unwrap());
if ($result !== null) {
$this->wrap($result);
}
} finally {
$lock->release();
}
return $result;
});
}
/**
* Frees the shared object from memory.
*
* The memory containing the shared value will be invalidated. When all
* process disconnect from the object, the shared memory block will be
* destroyed by the OS.
*/
public function __destruct()
{
if ($this->initializer === 0 || $this->initializer !== \getmypid()) {
return;
}
if ($this->isFreed()) {
return;
}
// Invalidate the memory block by setting its state to FREED.
$this->setHeader(static::STATE_FREED, 0, 0);
// Request the block to be deleted, then close our local handle.
$this->memDelete();
\shmop_close($this->handle);
$this->handle = null;
$this->semaphore = null;
}
/**
* Private method to prevent cloning.
*/
private function __clone()
{
}
/**
* Private method to prevent serialization.
*/
private function __sleep()
{
}
/**
* Updates the current memory segment handle, handling any moves made on the
* data.
*/
private function handleMovedMemory()
{
// Read from the memory block and handle moved blocks until we find the
// correct block.
while (true) {
$header = $this->getHeader();
// If the state is STATE_MOVED, the memory is stale and has been moved
// to a new location. Move handle and try to read again.
if ($header['state'] !== self::STATE_MOVED) {
break;
}
\shmop_close($this->handle);
$this->key = $header['size'];
$this->memOpen($this->key, 'w', 0, 0);
}
}
/**
* Reads and returns the data header at the current memory segment.
*
* @return array An associative array of header data.
*/
private function getHeader(): array
{
$data = $this->memGet(0, self::MEM_DATA_OFFSET);
return \unpack('Cstate/Lsize/Spermissions', $data);
}
/**
* Sets the header data for the current memory segment.
*
* @param int $state An object state.
* @param int $size The size of the stored data, or other value.
* @param int $permissions The permissions mask on the memory segment.
*/
private function setHeader(int $state, int $size, int $permissions)
{
$header = \pack('CLS', $state, $size, $permissions);
$this->memSet(0, $header);
}
/**
* Opens a shared memory handle.
*
* @param int $key The shared memory key.
* @param string $mode The mode to open the shared memory in.
* @param int $permissions Process permissions on the shared memory.
* @param int $size The size to crate the shared memory in bytes.
*/
private function memOpen(int $key, string $mode, int $permissions, int $size)
{
$this->handle = @\shmop_open($key, $mode, $permissions, $size);
if ($this->handle === false) {
throw new SharedMemoryException('Failed to create shared memory block.');
}
}
/**
* Reads binary data from shared memory.
*
* @param int $offset The offset to read from.
* @param int $size The number of bytes to read.
*
* @return string The binary data at the given offset.
*/
private function memGet(int $offset, int $size): string
{
$data = \shmop_read($this->handle, $offset, $size);
if ($data === false) {
throw new SharedMemoryException('Failed to read from shared memory block.');
}
return $data;
}
/**
* Writes binary data to shared memory.
*
* @param int $offset The offset to write to.
* @param string $data The binary data to write.
*/
private function memSet(int $offset, string $data)
{
if (!\shmop_write($this->handle, $data, $offset)) {
throw new SharedMemoryException('Failed to write to shared memory block.');
}
}
/**
* Requests the shared memory segment to be deleted.
*/
private function memDelete()
{
if (!\shmop_delete($this->handle)) {
throw new SharedMemoryException('Failed to discard shared memory block.');
}
}
private static function makeKey(string $id): int
{
return \abs(\unpack("l", \md5($id, true))[1]);
}
}

View File

@ -1,62 +0,0 @@
<?php
namespace Amp\Ipc\Sync;
use Amp\Promise;
use Amp\Success;
use Amp\Sync\ThreadedMutex;
use function Amp\call;
/**
* A thread-safe container that shares a value between multiple threads.
*/
final class ThreadedParcel implements Parcel
{
/** @var \Amp\Sync\ThreadedMutex */
private $mutex;
/** @var \Threaded */
private $storage;
/**
* Creates a new shared object container.
*
* @param mixed $value The value to store in the container.
*/
public function __construct($value)
{
$this->mutex = new ThreadedMutex;
$this->storage = new Internal\ParcelStorage($value);
}
/**
* {@inheritdoc}
*/
public function unwrap(): Promise
{
return new Success($this->storage->get());
}
/**
* @return \Amp\Promise
*/
public function synchronized(callable $callback): Promise
{
return call(function () use ($callback) {
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->mutex->acquire();
try {
$result = yield call($callback, $this->storage->get());
if ($result !== null) {
$this->storage->set($result);
}
} finally {
$lock->release();
}
return $result;
});
}
}

View File

@ -17,62 +17,65 @@ use function Amp\call;
function connect(string $uri): Promise function connect(string $uri): Promise
{ {
return call(static function () use ($uri) { return call(static function () use ($uri) {
if (!file_exists($uri)) {
throw new \RuntimeException("The endpoint does not exist!");
}
$type = \filetype($uri); $type = \filetype($uri);
if ($type === 'fifo') { if ($type !== 'fifo') {
$suffix = \bin2hex(\random_bytes(10)); if ($type === 'file') {
$prefix = \sys_get_temp_dir()."/amp-".$suffix.".fifo"; $uri = \file_get_contents($uri);
} else {
if (\strlen($prefix) > 0xFFFF) { $uri = "unix://$uri";
\trigger_error("Prefix is too long!", E_USER_ERROR);
exit(1);
} }
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
$message = "Could not connect to IPC socket";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
throw new \RuntimeException($message);
}
return new ChannelledSocket($socket, $socket);
}
$sockets = [ $suffix = \bin2hex(\random_bytes(10));
$prefix = \sys_get_temp_dir()."/amp-".$suffix.".fifo";
if (\strlen($prefix) > 0xFFFF) {
throw new \RuntimeException('Prefix is too long!');
}
$sockets = [
$prefix."2", $prefix."2",
$prefix."1", $prefix."1",
]; ];
foreach ($sockets as $k => &$socket) { foreach ($sockets as $k => &$socket) {
if (!\posix_mkfifo($socket, 0777)) { if (!\posix_mkfifo($socket, 0777)) {
\trigger_error("Could not create FIFO client socket", E_USER_ERROR); throw new \RuntimeException('Could not create FIFO client socket!');
exit(1);
}
\register_shutdown_function(static function () use ($socket): void {
@\unlink($socket);
});
if (!$socket = \fopen($socket, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
\trigger_error("Could not open FIFO client socket", E_USER_ERROR);
exit(1);
}
} }
if (!$tempSocket = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader \register_shutdown_function(static function () use ($socket): void {
\trigger_error("Could not connect to FIFO server", E_USER_ERROR); @\unlink($socket);
exit(1); });
}
\stream_set_blocking($tempSocket, false);
\stream_set_write_buffer($tempSocket, 0);
if (!\fwrite($tempSocket, \pack('v', \strlen($prefix)).$prefix)) { if (!$socket = \fopen($socket, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
\trigger_error("Failure sending request to FIFO server", E_USER_ERROR); throw new \RuntimeException("Could not open FIFO client socket");
exit(1);
} }
\fclose($tempSocket); }
$tempSocket = null;
return new ChannelledSocket(...$sockets); if (!$tempSocket = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
throw new \RuntimeException("Could not connect to FIFO server");
} }
if ($type === 'file') { \stream_set_blocking($tempSocket, false);
$uri = \file_get_contents($uri); \stream_set_write_buffer($tempSocket, 0);
} else {
$uri = "unix://$uri"; if (!\fwrite($tempSocket, \pack('v', \strlen($prefix)).$prefix)) {
throw new \RuntimeException("Failure sending request to FIFO server");
} }
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) { \fclose($tempSocket);
\trigger_error("Could not connect to IPC socket", E_USER_ERROR); $tempSocket = null;
exit(1);
} return new ChannelledSocket(...$sockets);
return new ChannelledSocket($socket, $socket);
}); });
} }

View File

@ -1,19 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<phpunit <phpunit
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/6.0/phpunit.xsd" xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/4.1/phpunit.xsd"
backupGlobals="false" backupGlobals="false"
backupStaticAttributes="false" backupStaticAttributes="false"
bootstrap="vendor/autoload.php" bootstrap="vendor/autoload.php"
colors="true" colors="true"
convertErrorsToExceptions="true" convertErrorsToExceptions="true"
convertNoticesToExceptions="true" convertNoticesToExceptions="true"
convertWarningsToExceptions="true" convertWarningsToExceptions="true"
processIsolation="false" processIsolation="false"
stopOnFailure="false" stopOnFailure="false"
> >
<testsuites> <testsuites>
<testsuite name="Main"> <testsuite name="Amp Concurrent">
<directory>test</directory> <directory>test</directory>
</testsuite> </testsuite>
</testsuites> </testsuites>
@ -22,7 +22,7 @@
<directory suffix=".php">lib</directory> <directory suffix=".php">lib</directory>
</whitelist> </whitelist>
</filter> </filter>
<listeners> <logging>
<listener class="Amp\PHPUnit\LoopReset"/> <log type="coverage-html" target="build/coverage"/>
</listeners> </logging>
</phpunit> </phpunit>

36
test/Fixtures/server.php Normal file
View File

@ -0,0 +1,36 @@
<?php
\error_reporting(E_ALL);
\ini_set('log_errors', 1);
\ini_set('error_log', '/tmp/amphp.log');
\error_log('Inited IPC test!');
use Amp\Ipc\IpcServer;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Parallel\Sync\Channel;
use function Amp\delay;
return function (Channel $channel) use ($argv) {
$server = new IpcServer($argv[1], $argv[2] === "1" ? true : false);
yield $channel->send($server->getUri());
$socket = yield $server->accept();
if (!$socket instanceof ChannelledSocket) {
throw new \RuntimeException('Socket is not instance of ChanneledSocket');
}
$ping = yield $socket->receive();
if ($ping !== 'ping') {
throw new \RuntimeException("Received $ping instead of ping!");
}
yield $socket->send('pong');
yield $socket->disconnect();
$server->close();
return $server->accept();
};

46
test/IpcTest.php Normal file
View File

@ -0,0 +1,46 @@
<?php
namespace Amp\Ipc\Test;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Parallel\Context\Process;
use Amp\PHPUnit\AsyncTestCase;
use function Amp\Ipc\connect;
class IpcTest extends AsyncTestCase
{
/** @dataProvider provideUriFifo */
public function testBasicIPC(string $uri, bool $fifo)
{
$process = new Process([__DIR__.'/Fixtures/server.php', $uri, $fifo]);
yield $process->start();
$recvUri = yield $process->receive();
if ($uri) {
$this->assertEquals($uri, $recvUri);
}
$client = yield connect($recvUri);
$this->assertInstanceOf(ChannelledSocket::class, $client);
yield $client->send('ping');
$this->assertEquals('pong', yield $client->receive());
yield $client->disconnect();
$this->assertNull(yield $process->join());
}
public function provideUriFifo(): \Generator
{
foreach (['', sys_get_temp_dir().'/pony'] as $uri) {
if (\strncasecmp(\PHP_OS, "WIN", 3) === 0) {
yield [$uri, false];
} else {
yield [$uri, true];
yield [$uri, false];
}
}
}
}