diff --git a/composer.json b/composer.json index 3732346..fb619c2 100755 --- a/composer.json +++ b/composer.json @@ -21,8 +21,9 @@ } ], "require": { - "amphp/amp": "^2", + "amphp/amp": "^2.0", "amphp/byte-stream": "dev-master as 0.1", + "amphp/parser": "^1.0", "amphp/process": "dev-amp_v2 as 0.2" }, "require-dev": { diff --git a/lib/Sync/ChannelParser.php b/lib/Sync/ChannelParser.php new file mode 100644 index 0000000..40927f2 --- /dev/null +++ b/lib/Sync/ChannelParser.php @@ -0,0 +1,83 @@ +readResource = $read; - $this->writeResource = $write; - $this->autoClose = $autoClose; - - \stream_set_blocking($this->readResource, false); - \stream_set_read_buffer($this->readResource, 0); - \stream_set_write_buffer($this->readResource, 0); - - if ($this->readResource !== $this->writeResource) { - \stream_set_blocking($this->writeResource, false); - \stream_set_read_buffer($this->writeResource, 0); - \stream_set_write_buffer($this->writeResource, 0); - } - - $this->reads = $reads = new \SplQueue; - $this->writes = $writes = new \SplQueue; - - $errorHandler = static function ($errno, $errstr, $errfile, $errline) { - if ($errno & \error_reporting()) { - throw new ChannelException(\sprintf( - 'Received corrupted data. Errno: %d; %s in file %s on line %d', - $errno, - $errstr, - $errfile, - $errline - )); - } - }; - - $this->readWatcher = Loop::onReadable($this->readResource, static function ($watcher, $stream) use ($reads, $errorHandler) { - while (!$reads->isEmpty()) { - /** @var \Amp\Deferred $deferred */ - list($buffer, $length, $deferred) = $reads->shift(); - - if ($length === 0) { - // Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes. - $data = @\fread($stream, self::HEADER_LENGTH - \strlen($buffer)); - - if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) { - $deferred->fail(new ChannelException("The socket unexpectedly closed")); - break; - } - - $buffer .= $data; - - if (\strlen($buffer) !== self::HEADER_LENGTH) { - // Not enough data available. - $reads->unshift([$buffer, 0, $deferred]); - return; - } - - $data = \unpack("Cprefix/Llength", $data); - - if ($data["prefix"] !== 0) { - $deferred->fail(new ChannelException("Invalid header received")); - break; - } - - $length = $data["length"]; - $buffer = ''; - } - - // Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes. - $data = @\fread($stream, $length - \strlen($buffer)); - - if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) { - $deferred->fail(new ChannelException("The socket unexpectedly closed")); - break; - } - - $buffer .= $data; - - if (\strlen($buffer) < $length) { - // Not enough data available. - $reads->unshift([$buffer, $length, $deferred]); - return; - } - - \set_error_handler($errorHandler); - - try { - // Attempt to unserialize the received data. - try { - $data = \unserialize($buffer); - } finally { - \restore_error_handler(); - } - - $deferred->resolve($data); - } catch (\Throwable $exception) { - $deferred->fail(new SerializationException("Exception thrown when unserializing data", $exception)); - } - } - - Loop::disable($watcher); - }); - - $this->writeWatcher = Loop::onWritable($this->writeResource, static function ($watcher, $stream) use ($writes) { - try { - while (!$writes->isEmpty()) { - /** @var \Amp\Deferred $deferred */ - list($data, $previous, $deferred) = $writes->shift(); - $length = \strlen($data); - - if ($length === 0) { - $deferred->resolve(0); - continue; - } - - // Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full. - $written = @\fwrite($stream, $data); - - if ($written === false || $written === 0) { - $message = "Failed to write to socket"; - if ($error = \error_get_last()) { - $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); - } - $exception = new ChannelException($message); - $deferred->fail($exception); - while (!$writes->isEmpty()) { - list(, , $deferred) = $writes->shift(); - $deferred->fail($exception); - } - return; - } - - if ($length <= $written) { - $deferred->resolve($written + $previous); - continue; - } - - $data = \substr($data, $written); - $writes->unshift([$data, $written + $previous, $deferred]); - return; - } - } finally { - if ($writes->isEmpty()) { - Loop::disable($watcher); - } - } - }); - - Loop::disable($this->readWatcher); - Loop::disable($this->writeWatcher); - } - - public function __destruct() { - if ($this->readResource !== null) { - $this->close(); - } + public function __construct($read, $write) { + parent::__construct( + $this->read = new ResourceInputStream($read), + $this->write = new ResourceOutputStream($write) + ); } /** - * {@inheritdoc} + * Closes the read and write resource streams. */ public function close() { - if (\is_resource($this->readResource)) { - if ($this->autoClose) { - @\fclose($this->readResource); - - if ($this->readResource !== $this->writeResource) { - @\fclose($this->writeResource); - } - } - $this->readResource = null; - $this->writeResource = null; - } - - $this->open = false; - - if (!$this->reads->isEmpty()) { - $exception = new ChannelException("The connection was unexpectedly closed before reading completed"); - do { - /** @var \Amp\Deferred $deferred */ - list(, , $deferred) = $this->reads->shift(); - $deferred->fail($exception); - } while (!$this->reads->isEmpty()); - } - - if (!$this->writes->isEmpty()) { - $exception = new ChannelException("The connection was unexpectedly writing completed"); - do { - /** @var \Amp\Deferred $deferred */ - list(, , $deferred) = $this->writes->shift(); - $deferred->fail($exception); - } while (!$this->writes->isEmpty()); - } - - Loop::cancel($this->readWatcher); - Loop::cancel($this->writeWatcher); + $this->read->close(); + $this->write->close(); } - - /** - * {@inheritdoc} - */ - public function receive(): Promise { - if (!$this->open) { - return new Failure(new ChannelException("The channel is has been closed")); - } - - $deferred = new Deferred; - $this->reads->push(["", 0, $deferred]); - Loop::enable($this->readWatcher); - return $deferred->promise(); - } - - /** - * @param string $data - * @param bool $end - * - * @return \Amp\Promise - */ - public function send($data): Promise { - if (!$this->open) { - return new Failure(new ChannelException("The channel is has been closed")); - } - - // Serialize the data to send into the channel. - try { - $data = \serialize($data); - } catch (\Throwable $exception) { - throw new SerializationException( - "The given data cannot be sent because it is not serializable.", $exception - ); - } - - $data = \pack("CL", 0, \strlen($data)) . $data; - $length = \strlen($data); - $written = 0; - - if ($this->writes->isEmpty()) { - // Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full. - $written = @\fwrite($this->writeResource, $data); - - if ($written === false) { - $message = "Failed to write to stream"; - if ($error = \error_get_last()) { - $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); - } - return new Failure(new ChannelException($message)); - } - - if ($length <= $written) { - return new Success($written); - } - - $data = \substr($data, $written); - } - - $deferred = new Deferred; - $this->writes->push([$data, $written, $deferred]); - Loop::enable($this->writeWatcher); - return $deferred->promise(); - } -} +} \ No newline at end of file diff --git a/lib/Sync/ChannelledStream.php b/lib/Sync/ChannelledStream.php index 8efbed4..eda4b4b 100644 --- a/lib/Sync/ChannelledStream.php +++ b/lib/Sync/ChannelledStream.php @@ -4,7 +4,6 @@ namespace Amp\Parallel\Sync; use Amp\ByteStream\InputStream; use Amp\ByteStream\OutputStream; -use Amp\ByteStream\Parser; use Amp\ByteStream\StreamException; use Amp\Coroutine; use Amp\Promise; @@ -15,8 +14,6 @@ use Amp\Promise; * Supports full duplex read and write. */ class ChannelledStream implements Channel { - const HEADER_LENGTH = 5; - /** @var \Amp\ByteStream\InputStream */ private $read; @@ -26,7 +23,7 @@ class ChannelledStream implements Channel { /** @var \SplQueue */ private $received; - /** @var \Amp\ByteStream\Parser */ + /** @var \Amp\Parser\Parser */ private $parser; /** @@ -39,51 +36,7 @@ class ChannelledStream implements Channel { $this->read = $read; $this->write = $write; $this->received = new \SplQueue; - $this->parser = new Parser(self::parser($this->received, static function ($errno, $errstr, $errfile, $errline) { - if ($errno & \error_reporting()) { - throw new ChannelException(\sprintf( - 'Received corrupted data. Errno: %d; %s in file %s on line %d', - $errno, - $errstr, - $errfile, - $errline - )); - } - })); - } - - /** - * @param \SplQueue $queue - * @param callable $errorHandler - * - * @return \Generator - * - * @throws \Amp\Parallel\Sync\ChannelException - * @throws \Amp\Parallel\Sync\SerializationException - */ - private static function parser(\SplQueue $queue, callable $errorHandler): \Generator { - while (true) { - $data = \unpack("Cprefix/Llength", yield self::HEADER_LENGTH); - - if ($data["prefix"] !== 0) { - throw new ChannelException("Invalid header received"); - } - - $data = yield $data["length"]; - - \set_error_handler($errorHandler); - - // Attempt to unserialize the received data. - try { - $data = \unserialize($data); - } catch (\Throwable $exception) { - throw new SerializationException("Exception thrown when unserializing data", $exception); - } finally { - \restore_error_handler(); - } - - $queue->push($data); - } + $this->parser = new ChannelParser([$this->received, 'push']); } /** @@ -94,17 +47,8 @@ class ChannelledStream implements Channel { } private function doSend($data): \Generator { - // Serialize the data to send into the channel. try { - $data = \serialize($data); - } catch (\Throwable $exception) { - throw new SerializationException( - "The given data cannot be sent because it is not serializable.", $exception - ); - } - - try { - return yield $this->write->write(\pack("CL", 0, \strlen($data)) . $data); + return yield $this->write->write($this->parser->encode($data)); } catch (StreamException $exception) { throw new ChannelException("Sending on the channel failed. Did the context die?", $exception); } @@ -124,7 +68,7 @@ class ChannelledStream implements Channel { } try { - yield $this->parser->write($chunk); + $this->parser->push($chunk); } catch (StreamException $exception) { throw new ChannelException("Reading from the channel failed. Did the context die?", $exception); }