From bf90335bb6273091a38e2ffb8c3faeda4bd3986b Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sun, 30 Apr 2017 08:31:53 +0200 Subject: [PATCH 01/11] Major API change + merged Reader and Writer from amphp/socket --- composer.json | 2 +- lib/Buffer.php | 71 ++---------- lib/BufferIterator.php | 139 ----------------------- lib/BufferedInputStream.php | 58 ++++++++++ lib/BufferedOutputStream.php | 78 +++++++++++++ lib/DuplexStream.php | 6 - lib/InputStream.php | 45 ++++++++ lib/Internal/functions.php | 21 ---- lib/Message.php | 113 ------------------ lib/OutputStream.php | 43 +++++++ lib/Parser.php | 11 +- lib/PendingReadException.php | 17 +++ lib/ReadableStream.php | 22 ---- lib/ResourceInputStream.php | 132 +++++++++++++++++++++ lib/ResourceOutputStream.php | 214 +++++++++++++++++++++++++++++++++++ lib/WritableStream.php | 21 ---- lib/functions.php | 21 +++- test/MessageTest.php | 18 +-- 18 files changed, 628 insertions(+), 404 deletions(-) delete mode 100644 lib/BufferIterator.php create mode 100644 lib/BufferedInputStream.php create mode 100644 lib/BufferedOutputStream.php delete mode 100644 lib/DuplexStream.php create mode 100644 lib/InputStream.php delete mode 100644 lib/Internal/functions.php delete mode 100644 lib/Message.php create mode 100644 lib/OutputStream.php create mode 100644 lib/PendingReadException.php delete mode 100644 lib/ReadableStream.php create mode 100644 lib/ResourceInputStream.php create mode 100644 lib/ResourceOutputStream.php delete mode 100644 lib/WritableStream.php diff --git a/composer.json b/composer.json index e80adcd..e387f8e 100644 --- a/composer.json +++ b/composer.json @@ -21,7 +21,7 @@ "minimum-stability": "dev", "prefer-stable": true, "require": { - "amphp/amp": "dev-master as 2.0" + "amphp/amp": "dev-stream-refactor as 2.0" }, "require-dev": { "amphp/phpunit-util": "dev-master", diff --git a/lib/Buffer.php b/lib/Buffer.php index 97f0ff3..2444839 100644 --- a/lib/Buffer.php +++ b/lib/Buffer.php @@ -4,7 +4,7 @@ namespace Amp\ByteStream; /** */ -class Buffer implements \ArrayAccess, \Countable, \IteratorAggregate { +class Buffer { /** @var string */ private $data; @@ -26,13 +26,6 @@ class Buffer implements \ArrayAccess, \Countable, \IteratorAggregate { return \strlen($this->data); } - /** - * @return int - */ - public function count(): int { - return $this->getLength(); - } - /** * Determines if the buffer is empty. * @@ -168,7 +161,7 @@ class Buffer implements \ArrayAccess, \Countable, \IteratorAggregate { * @param string $string * @param int $position */ - public function insert(string $string, int $position) { + public function insert(string $string, int $position) { $this->data = \substr_replace($this->data, $string, $position, 0); } @@ -192,65 +185,15 @@ class Buffer implements \ArrayAccess, \Countable, \IteratorAggregate { * @param bool $reverse Start search from end of buffer. * * @return int|bool - * - * @see strpos() */ - public function search(string $string, bool $reverse = false) { + public function indexOf(string $string, bool $reverse = false): int { if ($reverse) { - return \strrpos($this->data, $string); + $result = \strrpos($this->data, $string); + } else { + $result = \strpos($this->data, $string); } - return \strpos($this->data, $string); - } - - /** - * Determines if the buffer contains the given position. - * - * @param int $index - * - * @return bool - */ - public function offsetExists($index) { - return isset($this->data[$index]); - } - - /** - * Returns the character in the buffer at the given position. - * - * @param int $index - * - * @return string - */ - public function offsetGet($index) { - return $this->data[$index]; - } - - /** - * Replaces the character in the buffer at the given position with the given string. - * - * @param int $index - * @param string $data - */ - public function offsetSet($index, $data) { - $this->data = \substr_replace($this->data, $data, $index, 1); - } - - /** - * Removes the character at the given index from the buffer. - * - * @param int $index - */ - public function offsetUnset($index) { - if (isset($this->data[$index])) { - $this->data = \substr_replace($this->data, null, $index, 1); - } - } - - /** - * @return \Iterator - */ - public function getIterator(): \Iterator { - return new BufferIterator($this); + return $result === false ? -1 : $result; } /** diff --git a/lib/BufferIterator.php b/lib/BufferIterator.php deleted file mode 100644 index 625c279..0000000 --- a/lib/BufferIterator.php +++ /dev/null @@ -1,139 +0,0 @@ -buffer = $buffer; - } - - /** - * Rewinds the iterator to the beginning of the buffer. - */ - public function rewind() { - $this->current = 0; - } - - /** - * Determines if the iterator is valid. - * - * @return bool - */ - public function valid(): bool { - return isset($this->buffer[$this->current]); - } - - /** - * Returns the current position (key) of the iterator. - * - * @return int - */ - public function key(): int { - return $this->current; - } - - /** - * Returns the current character in the buffer at the iterator position. - * - * @return string - */ - public function current(): string { - return $this->buffer[$this->current]; - } - - /** - * Moves to the next character in the buffer. - */ - public function next() { - ++$this->current; - } - - /** - * Moves to the previous character in the buffer. - */ - public function prev() { - --$this->current; - } - - /** - * Moves to the given position in the buffer. - * - * @param int $position - */ - public function seek($position) { - $position = (int) $position; - if (0 > $position) { - $position = 0; - } - - $this->current = $position; - } - - /** - * Inserts the given string into the buffer at the current iterator position. - * - * @param string $data - * - * @throws \OutOfBoundsException If the iterator is invalid. - */ - public function insert(string $data) { - if (!$this->valid()) { - throw new \OutOfBoundsException('The iterator is not valid!'); - } - - $this->buffer[$this->current] = $data . $this->buffer[$this->current]; - } - - /** - * Replaces the byte at the current iterator position with the given string. - * - * @param string $data - * - * @return string - * - * @throws \OutOfBoundsException If the iterator is invalid. - */ - public function replace(string $data): string { - if (!$this->valid()) { - throw new \OutOfBoundsException('The iterator is not valid!'); - } - - $temp = $this->buffer[$this->current]; - - $this->buffer[$this->current] = $data; - - return $temp; - } - - /** - * Removes the byte at the current iterator position and moves the iterator to the previous character. - * - * @return string - * - * @throws \OutOfBoundsException If the iterator is invalid. - */ - public function remove(): string { - if (!$this->valid()) { - throw new \OutOfBoundsException('The iterator is not valid!'); - } - - $temp = $this->buffer[$this->current]; - - unset($this->buffer[$this->current]); - - --$this->current; - - return $temp; - } -} diff --git a/lib/BufferedInputStream.php b/lib/BufferedInputStream.php new file mode 100644 index 0000000..09ae0c4 --- /dev/null +++ b/lib/BufferedInputStream.php @@ -0,0 +1,58 @@ +source = $source; + $this->chunkSize = $chunkSize; + } + + /** + * Reads data from the stream. + * + * @return Promise Resolves with a string when new data is available or `null` if the stream has closed. + * + * @throws PendingReadException Thrown if another read operation is still pending. + */ + public function read(): Promise { + if ($this->readOperation !== null) { + throw new PendingReadException; + } + + $this->readOperation = call(function () { + $buffer = ""; + + while (($chunk = yield $this->source->read()) !== null) { + $buffer .= ""; + + if (isset($buffer[$this->chunkSize - 1])) { + return $buffer; + } + } + + return $buffer; + }); + + return $this->readOperation; + } + + /** + * Closes the stream forcefully. Multiple `close()` calls are ignored. + * + * Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you + * want to allow half-closed duplex streams, you must use different objects for input and output. + * + * @return void + */ + public function close() { + $this->source->close(); + } +} \ No newline at end of file diff --git a/lib/BufferedOutputStream.php b/lib/BufferedOutputStream.php new file mode 100644 index 0000000..c939317 --- /dev/null +++ b/lib/BufferedOutputStream.php @@ -0,0 +1,78 @@ +destination = $destination; + $this->chunkSize = $chunkSize; + $this->buffer = new Buffer; + $this->closed = false; + } + + /** + * Writes data to the stream. + * + * @param string $data Bytes to write. + * + * @return Promise Succeeds once the data has been successfully written to the stream. + * + * @throws ClosedException If the stream has already been closed. + */ + public function write(string $data): Promise { + if ($this->closed) { + throw new ClosedException("The stream has already been closed"); + } + + $this->buffer->push($data); + + if ($this->buffer->getLength() < $this->chunkSize) { + return new Success; + } + + return $this->destination->write($this->buffer->drain()); + } + + /** + * Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before. + * + * @param string $finalData Bytes to write. + * + * @return Promise Succeeds once the data has been successfully written to the stream. + * + * @throws ClosedException If the stream has already been closed. + */ + public function end(string $finalData = ""): Promise { + $promise = $this->write($this->buffer->drain() . $finalData); + $promise->onResolve([$this, "close"]); + + return $promise; + } + + /** + * Closes the stream forcefully. Multiple `close()` calls are ignored. Successful streams should always be closed + * via `end()`. + * + * @return void + */ + public function close() { + $this->closed = true; + $this->destination->close(); + } +} \ No newline at end of file diff --git a/lib/DuplexStream.php b/lib/DuplexStream.php deleted file mode 100644 index c1aabe8..0000000 --- a/lib/DuplexStream.php +++ /dev/null @@ -1,6 +0,0 @@ -read()) !== null) { + * $buffer .= $chunk; + * } + * + * return $buffer; + * }); + * } + * ``` + */ +interface InputStream { + /** + * Reads data from the stream. + * + * @return Promise Resolves with a string when new data is available or `null` if the stream has closed. + * + * @throws PendingReadException Thrown if another read operation is still pending. + */ + public function read(): Promise; + + /** + * Closes the stream forcefully. Multiple `close()` calls are ignored. + * + * Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you + * want to allow half-closed duplex streams, you must use different objects for input and output. + * + * @return void + */ + public function close(); +} diff --git a/lib/Internal/functions.php b/lib/Internal/functions.php deleted file mode 100644 index 683249a..0000000 --- a/lib/Internal/functions.php +++ /dev/null @@ -1,21 +0,0 @@ -advance()) { - $data = $source->getChunk(); - $written += \strlen($data); - yield $destination->write($data); - } - - return $written; -} diff --git a/lib/Message.php b/lib/Message.php deleted file mode 100644 index dbc2126..0000000 --- a/lib/Message.php +++ /dev/null @@ -1,113 +0,0 @@ -advance()) { - * $chunk = $message->getChunk(); - * // Immediately use $chunk, reducing memory consumption since the entire message is never buffered. - * } - */ -class Message implements ReadableStream, Promise { - const LISTENING = 0; - const BUFFERING = 1; - const WAITING = 2; - const COMPLETE = 4; - - /** @var \Amp\StreamIterator|null */ - private $iterator; - - /** @var int */ - private $status = self::LISTENING; - - /** @var mixed Final emitted chunk. */ - private $result; - - /** @var \Amp\Deferred */ - private $deferred; - - /** - * @param \Amp\Stream $stream Stream that only emits strings. - */ - public function __construct(Stream $stream) { - $this->iterator = new StreamIterator($stream); - $this->deferred = new Deferred; - - $stream->onResolve(function ($exception, $value) { - if ($exception) { - $this->deferred->fail($exception); - return; - } - - $result = \implode($this->iterator->drain()); - $this->iterator = null; - $this->status = \strlen($result) ? self::BUFFERING : self::WAITING; - $this->result = $result; - $this->deferred->resolve($result); - }); - } - - /** - * {@inheritdoc} - */ - public function advance(): Promise { - if ($this->iterator) { - return $this->iterator->advance(); - } - - switch ($this->status) { - case self::BUFFERING: - $this->status = self::WAITING; - return new Success(true); - - case self::WAITING: - $this->status = self::COMPLETE; - return new Success(false); - - default: - throw new \Error("The stream has resolved"); - } - } - - /** - * {@inheritdoc} - */ - public function getChunk(): string { - if ($this->iterator) { - return $this->iterator->getCurrent(); - } - - switch ($this->status) { - case self::COMPLETE: - throw new \Error("The stream has resolved"); - - default: - return $this->result; - } - } - - /** - * {@inheritdoc} - */ - public function onResolve(callable $onResolved) { - $this->deferred->promise()->onResolve($onResolved); - } -} diff --git a/lib/OutputStream.php b/lib/OutputStream.php new file mode 100644 index 0000000..fa21aa3 --- /dev/null +++ b/lib/OutputStream.php @@ -0,0 +1,43 @@ +dispose(); + } +} \ No newline at end of file diff --git a/lib/PendingReadException.php b/lib/PendingReadException.php new file mode 100644 index 0000000..0b8e904 --- /dev/null +++ b/lib/PendingReadException.php @@ -0,0 +1,17 @@ +resource = $stream; + $this->emitter = new Emitter; + $this->iterator = $this->emitter->getIterator(); + $this->autoClose = $autoClose; + + $emitter = &$this->emitter; + + $this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use (&$emitter, $chunkSize) { + // Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes. + $data = @\fread($stream, $chunkSize); + + if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) { + Loop::cancel($watcher); + $temp = $emitter; + $emitter = null; + $temp->complete(); + return; + } + + Loop::disable($watcher); + + $emitter->emit($data)->onResolve(function ($exception) use (&$emitter, $watcher) { + if ($emitter !== null && $exception === null) { + Loop::enable($watcher); + } + }); + }); + } + + /** + * Reads data from the stream. + * + * @return Promise Resolves with a string when new data is available or `null` if the stream has closed. + * + * @throws PendingReadException Thrown if another read operation is still pending. + */ + public function read(): Promise { + if ($this->readOperation !== null) { + throw new PendingReadException; + } + + Loop::enable($this->watcher); + + $this->readOperation = call(function () { + if (yield $this->emitter->getIterator()->advance()) { + $this->readOperation = null; + return $this->emitter->getIterator()->getCurrent(); + } + + throw new ClosedException("The stream has been closed"); + }); + + return $this->readOperation; + } + + /** + * Closes the stream forcefully. Multiple `close()` calls are ignored. + * + * Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you + * want to allow half-closed duplex streams, you must use different objects for input and output. + * + * @return void + */ + public function close() { + if ($this->resource === null) { + return; + } + + if ($this->autoClose && \is_resource($this->resource)) { + @\fclose($this->resource); + } + + $this->resource = null; + + if ($this->emitter !== null) { + $temp = $this->emitter; + $this->emitter = null; + $temp->complete(); + } + + Loop::cancel($this->watcher); + } + + public function __destruct() { + if ($this->autoClose) { + $this->close(); + } + } +} \ No newline at end of file diff --git a/lib/ResourceOutputStream.php b/lib/ResourceOutputStream.php new file mode 100644 index 0000000..7234a91 --- /dev/null +++ b/lib/ResourceOutputStream.php @@ -0,0 +1,214 @@ +resource = $stream; + $this->autoClose = $autoClose; + + $writes = $this->writes = new \SplQueue; + $writable = &$this->writable; + + $this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, &$writable) { + try { + while (!$writes->isEmpty()) { + /** @var \Amp\Deferred $deferred */ + list($data, $previous, $deferred) = $writes->shift(); + $length = \strlen($data); + + if ($length === 0) { + $deferred->resolve(0); + continue; + } + + // Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full. + $written = @\fwrite($stream, $data); + + if ($written === false || $written === 0) { + $writable = false; + + $message = "Failed to write to socket"; + if ($error = \error_get_last()) { + $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); + } + $exception = new \Exception($message); + $deferred->fail($exception); + while (!$writes->isEmpty()) { + list(, , $deferred) = $writes->shift(); + $deferred->fail($exception); + } + return; + } + + if ($length <= $written) { + $deferred->resolve($written + $previous); + continue; + } + + $data = \substr($data, $written); + $writes->unshift([$data, $written + $previous, $deferred]); + return; + } + } finally { + if ($writes->isEmpty()) { + Loop::disable($watcher); + } + } + }); + + Loop::disable($this->watcher); + } + + /** + * Writes data to the stream. + * + * @param string $data Bytes to write. + * + * @return Promise Succeeds once the data has been successfully written to the stream. + * + * @throws ClosedException If the stream has already been closed. + */ + public function write(string $data): Promise { + return $this->send($data, false); + } + + /** + * Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before. + * + * @param string $finalData Bytes to write. + * + * @return Promise Succeeds once the data has been successfully written to the stream. + * + * @throws ClosedException If the stream has already been closed. + */ + public function end(string $finalData = ""): Promise { + return $this->send($finalData, true); + } + + /** + * @param string $data + * @param bool $end + * + * @return Promise + */ + private function send(string $data, bool $end = false): Promise { + if (!$this->writable) { + return new Failure(new \Exception("The stream is not writable")); + } + + $length = \strlen($data); + $written = 0; + + if ($end) { + $this->writable = false; + } + + if ($this->writes->isEmpty()) { + if ($length === 0) { + if ($end) { + $this->close(); + } + return new Success(0); + } + + // Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full. + $written = @\fwrite($this->resource, $data); + + if ($written === false) { + $message = "Failed to write to stream"; + if ($error = \error_get_last()) { + $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); + } + return new Failure(new StreamException($message)); + } + + if ($length <= $written) { + if ($end) { + $this->close(); + } + return new Success($written); + } + + $data = \substr($data, $written); + } + + $deferred = new Deferred; + $this->writes->push([$data, $written, $deferred]); + Loop::enable($this->watcher); + $promise = $deferred->promise(); + + if ($end) { + $promise->onResolve([$this, 'close']); + } + + return $promise; + } + + /** + * @inheritdoc + */ + public function close() { + if ($this->resource !== null) { + return; + } + + if (\is_resource($this->resource)) { + @\fclose($this->resource); + } + + $this->resource = null; + $this->writable = false; + + if (!$this->writes->isEmpty()) { + $exception = new ClosedException("The socket was closed before writing completed"); + do { + /** @var \Amp\Deferred $deferred */ + list(, , $deferred) = $this->writes->shift(); + $deferred->fail($exception); + } while (!$this->writes->isEmpty()); + } + + Loop::cancel($this->watcher); + } + + public function __destruct() { + if ($this->autoClose) { + $this->close(); + } + } +} \ No newline at end of file diff --git a/lib/WritableStream.php b/lib/WritableStream.php deleted file mode 100644 index 99f39cc..0000000 --- a/lib/WritableStream.php +++ /dev/null @@ -1,21 +0,0 @@ -read()) !== null) { + $written += \strlen($chunk); + yield $destination->write($chunk); + } + + return $written; + }); +} \ No newline at end of file diff --git a/test/MessageTest.php b/test/MessageTest.php index a20f045..ff4d808 100644 --- a/test/MessageTest.php +++ b/test/MessageTest.php @@ -2,7 +2,7 @@ namespace Amp\ByteStream\Test; -use Amp\ByteStream\Message; +use Amp\ByteStream\IteratorStream; use Amp\Emitter; use Amp\Loop; use Amp\PHPUnit\TestCase; @@ -14,7 +14,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $message = new Message($emitter->stream()); + $message = new IteratorStream($emitter->stream()); foreach ($values as $value) { $emitter->emit($value); @@ -33,7 +33,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $message = new Message($emitter->stream()); + $message = new IteratorStream($emitter->stream()); foreach ($values as $value) { $emitter->emit($value); @@ -58,7 +58,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $message = new Message($emitter->stream()); + $message = new IteratorStream($emitter->stream()); foreach ($values as $value) { $emitter->emit($value); @@ -80,7 +80,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $message = new Message($emitter->stream()); + $message = new IteratorStream($emitter->stream()); foreach ($values as $value) { $emitter->emit($value); @@ -105,7 +105,7 @@ class MessageTest extends TestCase { $value = "abc"; $emitter = new Emitter; - $message = new Message($emitter->stream()); + $message = new IteratorStream($emitter->stream()); $emitter->emit($value); $emitter->fail($exception); @@ -123,7 +123,7 @@ class MessageTest extends TestCase { public function testEmptyStream() { Loop::run(function () { $value = 1; - $message = new Message(new Success($value)); + $message = new IteratorStream(new Success($value)); $this->assertFalse(yield $message->advance()); }); @@ -138,7 +138,7 @@ class MessageTest extends TestCase { $value = "abc"; $emitter = new Emitter; - $message = new Message($emitter->stream()); + $message = new IteratorStream($emitter->stream()); $emitter->emit($value); $emitter->resolve(); @@ -158,7 +158,7 @@ class MessageTest extends TestCase { $value = "abc"; $emitter = new Emitter; - $message = new Message($emitter->stream()); + $message = new IteratorStream($emitter->stream()); $emitter->emit($value); $emitter->resolve(); From f3536494e9132fb7afaeb92c4a9a104bdb780f17 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 4 May 2017 16:30:13 -0500 Subject: [PATCH 02/11] Fix conflicts in Parser --- lib/Parser.php | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/Parser.php b/lib/Parser.php index 1f521c4..899ed60 100644 --- a/lib/Parser.php +++ b/lib/Parser.php @@ -8,8 +8,6 @@ use Amp\Promise; use Amp\Success; class Parser implements OutputStream { - const CHUNK_SIZE = 8192; - /** @var \Generator */ private $generator; @@ -55,7 +53,7 @@ class Parser implements OutputStream { * @return string */ public function cancel(): string { - $this->generator = null; + $this->close(); return $this->buffer; } @@ -143,6 +141,6 @@ class Parser implements OutputStream { * @inheritdoc */ public function close() { - $this->dispose(); + $this->generator = null; } } \ No newline at end of file From 523ce79dabe331357010a5b1cd886aaff2da5249 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 4 May 2017 16:30:24 -0500 Subject: [PATCH 03/11] Update composer.json --- composer.json | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/composer.json b/composer.json index e387f8e..2734a58 100644 --- a/composer.json +++ b/composer.json @@ -21,7 +21,7 @@ "minimum-stability": "dev", "prefer-stable": true, "require": { - "amphp/amp": "dev-stream-refactor as 2.0" + "amphp/amp": "^2.0" }, "require-dev": { "amphp/phpunit-util": "dev-master", @@ -33,8 +33,7 @@ "Amp\\ByteStream\\": "lib/" }, "files": [ - "lib/functions.php", - "lib/Internal/functions.php" + "lib/functions.php" ] }, "autoload-dev": { From 4b7a537d7bbf2a4b862dc0e07d67bbf04537735c Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 4 May 2017 16:30:40 -0500 Subject: [PATCH 04/11] Refactor ResourceInputStream for new API --- lib/ResourceInputStream.php | 85 +++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/lib/ResourceInputStream.php b/lib/ResourceInputStream.php index 11f4180..d5df944 100644 --- a/lib/ResourceInputStream.php +++ b/lib/ResourceInputStream.php @@ -2,39 +2,40 @@ namespace Amp\ByteStream; -use Amp\Emitter; -use Amp\Iterator; +use Amp\Deferred; +use Amp\Failure; use Amp\Loop; use Amp\Promise; -use function Amp\call; class ResourceInputStream implements InputStream { + const DEFAULT_CHUNK_SIZE = 8192; + /** @var resource */ private $resource; /** @var string */ private $watcher; - /** @var Emitter */ - private $emitter; + /** @var \Amp\Deferred|null */ + private $deferred; - /** @var Iterator */ - private $iterator; + /** @var bool */ + private $readable = true; /** @var bool */ private $autoClose = true; - /** @var Promise|null */ - private $readOperation; - - public function __construct($stream, int $chunkSize = 8192, $autoClose = true) { + public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE, $autoClose = true) { if (!is_resource($stream) || get_resource_type($stream) !== 'stream') { throw new \Error("Expected a valid stream"); } $meta = \stream_get_meta_data($stream); - if (isset($meta["mode"]) && $meta["mode"] !== "" && strpos($meta["mode"], "r") === false && strpos($meta["mode"], "+") === false) { + if (isset($meta["mode"]) && $meta["mode"] !== "" + && strpos($meta["mode"], "r") === false + && strpos($meta["mode"], "+") === false + ) { throw new \Error("Expected a readable stream"); } @@ -42,32 +43,37 @@ class ResourceInputStream implements InputStream { \stream_set_read_buffer($stream, 0); $this->resource = $stream; - $this->emitter = new Emitter; - $this->iterator = $this->emitter->getIterator(); $this->autoClose = $autoClose; - $emitter = &$this->emitter; + $deferred = &$this->deferred; + $readable = &$this->readable; + + $this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use ( + &$deferred, &$readable, $chunkSize + ) { + if ($deferred === null) { + return; + } - $this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use (&$emitter, $chunkSize) { // Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes. $data = @\fread($stream, $chunkSize); if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) { + $readable = false; Loop::cancel($watcher); - $temp = $emitter; - $emitter = null; - $temp->complete(); - return; + $data = null; // Stream closed, resolve read with null. } - Loop::disable($watcher); + $temp = $deferred; + $deferred = null; + $temp->resolve($data); - $emitter->emit($data)->onResolve(function ($exception) use (&$emitter, $watcher) { - if ($emitter !== null && $exception === null) { - Loop::enable($watcher); - } - }); + if ($deferred === null) { // Only disable watcher if no further read was requested. + Loop::disable($watcher); + } }); + + Loop::disable($this->watcher); } /** @@ -78,22 +84,18 @@ class ResourceInputStream implements InputStream { * @throws PendingReadException Thrown if another read operation is still pending. */ public function read(): Promise { - if ($this->readOperation !== null) { + if ($this->deferred !== null) { throw new PendingReadException; } + if (!$this->readable) { + return new Failure(new ClosedException("The stream has been closed")); + } + + $this->deferred = new Deferred; Loop::enable($this->watcher); - $this->readOperation = call(function () { - if (yield $this->emitter->getIterator()->advance()) { - $this->readOperation = null; - return $this->emitter->getIterator()->getCurrent(); - } - - throw new ClosedException("The stream has been closed"); - }); - - return $this->readOperation; + return $this->deferred->promise(); } /** @@ -114,11 +116,12 @@ class ResourceInputStream implements InputStream { } $this->resource = null; + $this->readable = false; - if ($this->emitter !== null) { - $temp = $this->emitter; - $this->emitter = null; - $temp->complete(); + if ($this->deferred !== null) { + $deferred = $this->deferred; + $this->deferred = null; + $deferred->resolve(null); } Loop::cancel($this->watcher); From 622f98c9668029eafc684ca299a4911804fd2d53 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 5 May 2017 15:42:18 +0200 Subject: [PATCH 05/11] Improve readability, written is never larger than length --- lib/ResourceOutputStream.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ResourceOutputStream.php b/lib/ResourceOutputStream.php index 7234a91..aaa85fd 100644 --- a/lib/ResourceOutputStream.php +++ b/lib/ResourceOutputStream.php @@ -157,7 +157,7 @@ class ResourceOutputStream implements OutputStream { return new Failure(new StreamException($message)); } - if ($length <= $written) { + if ($length === $written) { if ($end) { $this->close(); } From 73e6441fd964e164492cbb54ebe62d63214886e7 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 5 May 2017 16:45:53 +0200 Subject: [PATCH 06/11] Add getResource() to resource streams --- lib/ResourceInputStream.php | 4 ++++ lib/ResourceOutputStream.php | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/lib/ResourceInputStream.php b/lib/ResourceInputStream.php index d5df944..72e7c1d 100644 --- a/lib/ResourceInputStream.php +++ b/lib/ResourceInputStream.php @@ -127,6 +127,10 @@ class ResourceInputStream implements InputStream { Loop::cancel($this->watcher); } + public function getResource() { + return $this->resource; + } + public function __destruct() { if ($this->autoClose) { $this->close(); diff --git a/lib/ResourceOutputStream.php b/lib/ResourceOutputStream.php index aaa85fd..ea502d2 100644 --- a/lib/ResourceOutputStream.php +++ b/lib/ResourceOutputStream.php @@ -206,6 +206,10 @@ class ResourceOutputStream implements OutputStream { Loop::cancel($this->watcher); } + public function getResource() { + return $this->resource; + } + public function __destruct() { if ($this->autoClose) { $this->close(); From 9714de66aecddf3eb094274761d1aea573ed0a0b Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 5 May 2017 16:47:45 +0200 Subject: [PATCH 07/11] Remove buffered stream implementations --- lib/BufferedInputStream.php | 58 --------------------------- lib/BufferedOutputStream.php | 78 ------------------------------------ 2 files changed, 136 deletions(-) delete mode 100644 lib/BufferedInputStream.php delete mode 100644 lib/BufferedOutputStream.php diff --git a/lib/BufferedInputStream.php b/lib/BufferedInputStream.php deleted file mode 100644 index 09ae0c4..0000000 --- a/lib/BufferedInputStream.php +++ /dev/null @@ -1,58 +0,0 @@ -source = $source; - $this->chunkSize = $chunkSize; - } - - /** - * Reads data from the stream. - * - * @return Promise Resolves with a string when new data is available or `null` if the stream has closed. - * - * @throws PendingReadException Thrown if another read operation is still pending. - */ - public function read(): Promise { - if ($this->readOperation !== null) { - throw new PendingReadException; - } - - $this->readOperation = call(function () { - $buffer = ""; - - while (($chunk = yield $this->source->read()) !== null) { - $buffer .= ""; - - if (isset($buffer[$this->chunkSize - 1])) { - return $buffer; - } - } - - return $buffer; - }); - - return $this->readOperation; - } - - /** - * Closes the stream forcefully. Multiple `close()` calls are ignored. - * - * Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you - * want to allow half-closed duplex streams, you must use different objects for input and output. - * - * @return void - */ - public function close() { - $this->source->close(); - } -} \ No newline at end of file diff --git a/lib/BufferedOutputStream.php b/lib/BufferedOutputStream.php deleted file mode 100644 index c939317..0000000 --- a/lib/BufferedOutputStream.php +++ /dev/null @@ -1,78 +0,0 @@ -destination = $destination; - $this->chunkSize = $chunkSize; - $this->buffer = new Buffer; - $this->closed = false; - } - - /** - * Writes data to the stream. - * - * @param string $data Bytes to write. - * - * @return Promise Succeeds once the data has been successfully written to the stream. - * - * @throws ClosedException If the stream has already been closed. - */ - public function write(string $data): Promise { - if ($this->closed) { - throw new ClosedException("The stream has already been closed"); - } - - $this->buffer->push($data); - - if ($this->buffer->getLength() < $this->chunkSize) { - return new Success; - } - - return $this->destination->write($this->buffer->drain()); - } - - /** - * Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before. - * - * @param string $finalData Bytes to write. - * - * @return Promise Succeeds once the data has been successfully written to the stream. - * - * @throws ClosedException If the stream has already been closed. - */ - public function end(string $finalData = ""): Promise { - $promise = $this->write($this->buffer->drain() . $finalData); - $promise->onResolve([$this, "close"]); - - return $promise; - } - - /** - * Closes the stream forcefully. Multiple `close()` calls are ignored. Successful streams should always be closed - * via `end()`. - * - * @return void - */ - public function close() { - $this->closed = true; - $this->destination->close(); - } -} \ No newline at end of file From 85cd222d2f7250ade2e9ccf788163f1aa2d5db0b Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 5 May 2017 16:52:06 +0200 Subject: [PATCH 08/11] Add simple benchmark --- examples/benchmark-throughput.php | 53 +++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 examples/benchmark-throughput.php diff --git a/examples/benchmark-throughput.php b/examples/benchmark-throughput.php new file mode 100644 index 0000000..6ba6d64 --- /dev/null +++ b/examples/benchmark-throughput.php @@ -0,0 +1,53 @@ +write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL); +} + +try { + if (!@\assert(false)) { + $stderr->write("NOTICE: Assertions are enabled, this has a major impact on performance." . PHP_EOL); + } +} catch (AssertionError $exception) { + $stderr->write("NOTICE: Assertions are enabled, this has a major impact on performance." . PHP_EOL); +} + +$stderr->write('piping from ' . $if . ' to ' . $of . ' (for max ' . $t . ' second(s)) ...'. PHP_EOL); + +Loop::delay($t * 1000, [$in, "close"]); + +Loop::run(function () use ($stderr, $in, $out) { + $start = microtime(true); + + while (($chunk = yield $in->read()) !== null) { + yield $out->write($chunk); + } + + $t = microtime(true) - $start; + + $bytes = ftell($out->getResource()); + + $stderr->write('read ' . $bytes . ' byte(s) in ' . round($t, 3) . ' second(s) => ' . round($bytes / 1024 / 1024 / $t, 1) . ' MiB/s' . PHP_EOL); + $stderr->write('peak memory usage of ' . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . ' MiB' . PHP_EOL); +}); \ No newline at end of file From bcefe903c9b2a57f1698a80a0adf67fa6158ed83 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 5 May 2017 21:38:43 +0200 Subject: [PATCH 09/11] Add ReactPHP credit --- examples/benchmark-throughput.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/benchmark-throughput.php b/examples/benchmark-throughput.php index 6ba6d64..1b25e54 100644 --- a/examples/benchmark-throughput.php +++ b/examples/benchmark-throughput.php @@ -1,5 +1,8 @@ Date: Fri, 5 May 2017 22:39:39 +0200 Subject: [PATCH 10/11] Add GzipInputStream --- examples/gzip-decompress.php | 19 ++++++++++ lib/GzipInputStream.php | 72 ++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 examples/gzip-decompress.php create mode 100644 lib/GzipInputStream.php diff --git a/examples/gzip-decompress.php b/examples/gzip-decompress.php new file mode 100644 index 0000000..8e01084 --- /dev/null +++ b/examples/gzip-decompress.php @@ -0,0 +1,19 @@ +read()) !== null) { + yield $stdout->write($chunk); + } +}); \ No newline at end of file diff --git a/lib/GzipInputStream.php b/lib/GzipInputStream.php new file mode 100644 index 0000000..d943d2b --- /dev/null +++ b/lib/GzipInputStream.php @@ -0,0 +1,72 @@ +source = $source; + $this->resource = \inflate_init(\ZLIB_ENCODING_GZIP); + + if ($this->resource === false) { + throw new StreamException("Failed initializing deflate context"); + } + } + + public function read(): Promise { + return call(function () { + if ($this->state === self::STATE_ENDED) { + throw new ClosedException("The stream has already been closed"); + } + + if ($this->state === self::STATE_ENDING) { + $this->state = self::STATE_ENDED; + return null; + } + + if ($this->state === self::STATE_FAILED) { + throw new StreamException("The stream has previously failed"); + } + + $data = yield $this->source->read(); + + if ($data === null) { + $decompressed = \inflate_add($this->resource, "", \ZLIB_FINISH); + + $this->state = self::STATE_ENDING; + + if ($decompressed === false) { + $this->state = self::STATE_FAILED; + throw new StreamException("Failed adding data to deflate context"); + } + + return $decompressed; + } + + $decompressed = \inflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH); + + if ($decompressed === false) { + $this->state = self::STATE_FAILED; + throw new StreamException("Failed adding data to deflate context"); + } + + return $decompressed; + }); + } + + public function close() { + $this->state = self::STATE_ENDED; + $this->source->close(); + } +} \ No newline at end of file From 23f5ba91b96773afed87766de5306d22715e99bd Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sun, 7 May 2017 11:14:41 +0200 Subject: [PATCH 11/11] Simplify GzipInputStream by removing the state tracking --- lib/GzipInputStream.php | 32 ++++++++------------------------ 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/lib/GzipInputStream.php b/lib/GzipInputStream.php index d943d2b..47c7b89 100644 --- a/lib/GzipInputStream.php +++ b/lib/GzipInputStream.php @@ -6,14 +6,8 @@ use Amp\Promise; use function Amp\call; class GzipInputStream implements InputStream { - const STATE_FAILED = -1; - const STATE_NORMAL = 0; - const STATE_ENDING = 1; - const STATE_ENDED = 2; - private $source; private $resource; - private $state = 0; public function __construct(InputStream $source) { $this->source = $source; @@ -26,38 +20,27 @@ class GzipInputStream implements InputStream { public function read(): Promise { return call(function () { - if ($this->state === self::STATE_ENDED) { - throw new ClosedException("The stream has already been closed"); - } - - if ($this->state === self::STATE_ENDING) { - $this->state = self::STATE_ENDED; - return null; - } - - if ($this->state === self::STATE_FAILED) { - throw new StreamException("The stream has previously failed"); - } - $data = yield $this->source->read(); if ($data === null) { + if ($this->resource === null) { + return null; + } + $decompressed = \inflate_add($this->resource, "", \ZLIB_FINISH); - $this->state = self::STATE_ENDING; - if ($decompressed === false) { - $this->state = self::STATE_FAILED; throw new StreamException("Failed adding data to deflate context"); } + $this->resource = null; + return $decompressed; } $decompressed = \inflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH); if ($decompressed === false) { - $this->state = self::STATE_FAILED; throw new StreamException("Failed adding data to deflate context"); } @@ -66,7 +49,8 @@ class GzipInputStream implements InputStream { } public function close() { - $this->state = self::STATE_ENDED; + $this->resource = null; + $this->source->close(); } } \ No newline at end of file