1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-11-30 04:19:23 +01:00

Merge pull request #3 from amphp/kelunik-api-proposal

Major API change + merged Reader and Writer from amphp/socket
This commit is contained in:
Aaron Piotrowski 2017-05-07 09:52:28 -05:00 committed by GitHub
commit 162ca40807
19 changed files with 636 additions and 409 deletions

View File

@ -21,7 +21,7 @@
"minimum-stability": "dev",
"prefer-stable": true,
"require": {
"amphp/amp": "dev-master as 2.0"
"amphp/amp": "^2.0"
},
"require-dev": {
"amphp/phpunit-util": "dev-master",
@ -33,8 +33,7 @@
"Amp\\ByteStream\\": "lib/"
},
"files": [
"lib/functions.php",
"lib/Internal/functions.php"
"lib/functions.php"
]
},
"autoload-dev": {

View File

@ -0,0 +1,56 @@
<?php
// Adopted from ReactPHP's stream package
// https://github.com/reactphp/stream/blob/b996af99fd1169ff74e93ef69c1513b7d0db19d0/examples/benchmark-throughput.php
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;
require __DIR__ . '/../vendor/autoload.php';
Loop::set(new Loop\NativeDriver());
$args = getopt('i:o:t:');
$if = isset($args['i']) ? $args['i'] : '/dev/zero';
$of = isset($args['o']) ? $args['o'] : '/dev/null';
$t = isset($args['t']) ? $args['t'] : 30;
// passing file descriptors requires mapping paths (https://bugs.php.net/bug.php?id=53465)
$if = preg_replace('(^/dev/fd/)', 'php://fd/', $if);
$of = preg_replace('(^/dev/fd/)', 'php://fd/', $of);
$stderr = new ResourceOutputStream(STDERR);
$in = new ResourceInputStream(fopen($if, 'r'), 65536 /* Default size used by React to allow comparisons */);
$out = new ResourceOutputStream(fopen($of, 'w'));
if (extension_loaded('xdebug')) {
$stderr->write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL);
}
try {
if (!@\assert(false)) {
$stderr->write("NOTICE: Assertions are enabled, this has a major impact on performance." . PHP_EOL);
}
} catch (AssertionError $exception) {
$stderr->write("NOTICE: Assertions are enabled, this has a major impact on performance." . PHP_EOL);
}
$stderr->write('piping from ' . $if . ' to ' . $of . ' (for max ' . $t . ' second(s)) ...'. PHP_EOL);
Loop::delay($t * 1000, [$in, "close"]);
Loop::run(function () use ($stderr, $in, $out) {
$start = microtime(true);
while (($chunk = yield $in->read()) !== null) {
yield $out->write($chunk);
}
$t = microtime(true) - $start;
$bytes = ftell($out->getResource());
$stderr->write('read ' . $bytes . ' byte(s) in ' . round($t, 3) . ' second(s) => ' . round($bytes / 1024 / 1024 / $t, 1) . ' MiB/s' . PHP_EOL);
$stderr->write('peak memory usage of ' . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . ' MiB' . PHP_EOL);
});

View File

@ -0,0 +1,19 @@
<?php
use Amp\ByteStream\GzipInputStream;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;
require __DIR__ . "/../vendor/autoload.php";
Loop::run(function () {
$stdin = new ResourceInputStream(STDIN);
$stdout = new ResourceOutputStream(STDOUT);
$gzin = new GzipInputStream($stdin);
while (($chunk = yield $gzin->read()) !== null) {
yield $stdout->write($chunk);
}
});

View File

@ -4,7 +4,7 @@ namespace Amp\ByteStream;
/**
*/
class Buffer implements \ArrayAccess, \Countable, \IteratorAggregate {
class Buffer {
/** @var string */
private $data;
@ -26,13 +26,6 @@ class Buffer implements \ArrayAccess, \Countable, \IteratorAggregate {
return \strlen($this->data);
}
/**
* @return int
*/
public function count(): int {
return $this->getLength();
}
/**
* Determines if the buffer is empty.
*
@ -168,7 +161,7 @@ class Buffer implements \ArrayAccess, \Countable, \IteratorAggregate {
* @param string $string
* @param int $position
*/
public function insert(string $string, int $position) {
public function insert(string $string, int $position) {
$this->data = \substr_replace($this->data, $string, $position, 0);
}
@ -192,65 +185,15 @@ class Buffer implements \ArrayAccess, \Countable, \IteratorAggregate {
* @param bool $reverse Start search from end of buffer.
*
* @return int|bool
*
* @see strpos()
*/
public function search(string $string, bool $reverse = false) {
public function indexOf(string $string, bool $reverse = false): int {
if ($reverse) {
return \strrpos($this->data, $string);
$result = \strrpos($this->data, $string);
} else {
$result = \strpos($this->data, $string);
}
return \strpos($this->data, $string);
}
/**
* Determines if the buffer contains the given position.
*
* @param int $index
*
* @return bool
*/
public function offsetExists($index) {
return isset($this->data[$index]);
}
/**
* Returns the character in the buffer at the given position.
*
* @param int $index
*
* @return string
*/
public function offsetGet($index) {
return $this->data[$index];
}
/**
* Replaces the character in the buffer at the given position with the given string.
*
* @param int $index
* @param string $data
*/
public function offsetSet($index, $data) {
$this->data = \substr_replace($this->data, $data, $index, 1);
}
/**
* Removes the character at the given index from the buffer.
*
* @param int $index
*/
public function offsetUnset($index) {
if (isset($this->data[$index])) {
$this->data = \substr_replace($this->data, null, $index, 1);
}
}
/**
* @return \Iterator
*/
public function getIterator(): \Iterator {
return new BufferIterator($this);
return $result === false ? -1 : $result;
}
/**

View File

@ -1,139 +0,0 @@
<?php
namespace Amp\ByteStream;
/**
*/
class BufferIterator implements \SeekableIterator {
/** @var \Amp\ByteStream\Buffer */
private $buffer;
/** @var int */
private $current = 0;
/**
* @param \Amp\ByteStream\Buffer $buffer
*/
public function __construct(Buffer $buffer) {
$this->buffer = $buffer;
}
/**
* Rewinds the iterator to the beginning of the buffer.
*/
public function rewind() {
$this->current = 0;
}
/**
* Determines if the iterator is valid.
*
* @return bool
*/
public function valid(): bool {
return isset($this->buffer[$this->current]);
}
/**
* Returns the current position (key) of the iterator.
*
* @return int
*/
public function key(): int {
return $this->current;
}
/**
* Returns the current character in the buffer at the iterator position.
*
* @return string
*/
public function current(): string {
return $this->buffer[$this->current];
}
/**
* Moves to the next character in the buffer.
*/
public function next() {
++$this->current;
}
/**
* Moves to the previous character in the buffer.
*/
public function prev() {
--$this->current;
}
/**
* Moves to the given position in the buffer.
*
* @param int $position
*/
public function seek($position) {
$position = (int) $position;
if (0 > $position) {
$position = 0;
}
$this->current = $position;
}
/**
* Inserts the given string into the buffer at the current iterator position.
*
* @param string $data
*
* @throws \OutOfBoundsException If the iterator is invalid.
*/
public function insert(string $data) {
if (!$this->valid()) {
throw new \OutOfBoundsException('The iterator is not valid!');
}
$this->buffer[$this->current] = $data . $this->buffer[$this->current];
}
/**
* Replaces the byte at the current iterator position with the given string.
*
* @param string $data
*
* @return string
*
* @throws \OutOfBoundsException If the iterator is invalid.
*/
public function replace(string $data): string {
if (!$this->valid()) {
throw new \OutOfBoundsException('The iterator is not valid!');
}
$temp = $this->buffer[$this->current];
$this->buffer[$this->current] = $data;
return $temp;
}
/**
* Removes the byte at the current iterator position and moves the iterator to the previous character.
*
* @return string
*
* @throws \OutOfBoundsException If the iterator is invalid.
*/
public function remove(): string {
if (!$this->valid()) {
throw new \OutOfBoundsException('The iterator is not valid!');
}
$temp = $this->buffer[$this->current];
unset($this->buffer[$this->current]);
--$this->current;
return $temp;
}
}

View File

@ -1,6 +0,0 @@
<?php
namespace Amp\ByteStream;
interface DuplexStream extends ReadableStream, WritableStream {
}

56
lib/GzipInputStream.php Normal file
View File

@ -0,0 +1,56 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
use function Amp\call;
class GzipInputStream implements InputStream {
private $source;
private $resource;
public function __construct(InputStream $source) {
$this->source = $source;
$this->resource = \inflate_init(\ZLIB_ENCODING_GZIP);
if ($this->resource === false) {
throw new StreamException("Failed initializing deflate context");
}
}
public function read(): Promise {
return call(function () {
$data = yield $this->source->read();
if ($data === null) {
if ($this->resource === null) {
return null;
}
$decompressed = \inflate_add($this->resource, "", \ZLIB_FINISH);
if ($decompressed === false) {
throw new StreamException("Failed adding data to deflate context");
}
$this->resource = null;
return $decompressed;
}
$decompressed = \inflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH);
if ($decompressed === false) {
throw new StreamException("Failed adding data to deflate context");
}
return $decompressed;
});
}
public function close() {
$this->resource = null;
$this->source->close();
}
}

45
lib/InputStream.php Normal file
View File

@ -0,0 +1,45 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
/**
* An `InputStream` allows reading byte streams in chunks.
*
* **Example**
*
* ```php
* function readAll(InputStream $in): Promise {
* return Amp\call(function () use ($in) {
* $buffer = "";
*
* while (($chunk = yield $in->read()) !== null) {
* $buffer .= $chunk;
* }
*
* return $buffer;
* });
* }
* ```
*/
interface InputStream {
/**
* Reads data from the stream.
*
* @return Promise Resolves with a string when new data is available or `null` if the stream has closed.
*
* @throws PendingReadException Thrown if another read operation is still pending.
*/
public function read(): Promise;
/**
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you
* want to allow half-closed duplex streams, you must use different objects for input and output.
*
* @return void
*/
public function close();
}

View File

@ -1,21 +0,0 @@
<?php
namespace Amp\ByteStream\Internal;
use Amp\ByteStream\ReadableStream;
use Amp\ByteStream\WritableStream;
/**
* @internal
*/
function pipe(ReadableStream $source, WritableStream $destination): \Generator {
$written = 0;
while (yield $source->advance()) {
$data = $source->getChunk();
$written += \strlen($data);
yield $destination->write($data);
}
return $written;
}

View File

@ -1,113 +0,0 @@
<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Promise;
use Amp\Stream;
use Amp\StreamIterator;
use Amp\Success;
/**
* Creates a buffered message from a Stream. The message can be consumed in chunks using the advance() and getChunk()
* methods or it may be buffered and accessed in its entirety by waiting for the promise to resolve.
*
* Buffering Example:
*
* $message = new Message($stream); // $stream is an instance of \Amp\Stream emitting only strings.
* $content = yield $message;
*
* Streaming Example:
*
* $message = new Message($stream); // $stream is a Stream emitting only strings.
*
* while (yield $message->advance()) {
* $chunk = $message->getChunk();
* // Immediately use $chunk, reducing memory consumption since the entire message is never buffered.
* }
*/
class Message implements ReadableStream, Promise {
const LISTENING = 0;
const BUFFERING = 1;
const WAITING = 2;
const COMPLETE = 4;
/** @var \Amp\StreamIterator|null */
private $iterator;
/** @var int */
private $status = self::LISTENING;
/** @var mixed Final emitted chunk. */
private $result;
/** @var \Amp\Deferred */
private $deferred;
/**
* @param \Amp\Stream $stream Stream that only emits strings.
*/
public function __construct(Stream $stream) {
$this->iterator = new StreamIterator($stream);
$this->deferred = new Deferred;
$stream->onResolve(function ($exception, $value) {
if ($exception) {
$this->deferred->fail($exception);
return;
}
$result = \implode($this->iterator->drain());
$this->iterator = null;
$this->status = \strlen($result) ? self::BUFFERING : self::WAITING;
$this->result = $result;
$this->deferred->resolve($result);
});
}
/**
* {@inheritdoc}
*/
public function advance(): Promise {
if ($this->iterator) {
return $this->iterator->advance();
}
switch ($this->status) {
case self::BUFFERING:
$this->status = self::WAITING;
return new Success(true);
case self::WAITING:
$this->status = self::COMPLETE;
return new Success(false);
default:
throw new \Error("The stream has resolved");
}
}
/**
* {@inheritdoc}
*/
public function getChunk(): string {
if ($this->iterator) {
return $this->iterator->getCurrent();
}
switch ($this->status) {
case self::COMPLETE:
throw new \Error("The stream has resolved");
default:
return $this->result;
}
}
/**
* {@inheritdoc}
*/
public function onResolve(callable $onResolved) {
$this->deferred->promise()->onResolve($onResolved);
}
}

43
lib/OutputStream.php Normal file
View File

@ -0,0 +1,43 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
/**
* An `OutputStream` allows writing data in chunks. Writers can wait on the returned promises to feel the backpressure.
*/
interface OutputStream {
/**
* Writes data to the stream.
*
* @param string $data Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
public function write(string $data): Promise;
/**
* Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before.
*
* @param string $finalData Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
public function end(string $finalData = ""): Promise;
/**
* Closes the stream forcefully. Multiple `close()` calls are ignored. Successful streams should always be closed
* via `end()`.
*
* Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you
* want to allow half-closed duplex streams, you must use different objects for input and output.
*
* @return void
*/
public function close();
}

View File

@ -7,9 +7,7 @@ use Amp\InvalidYieldError;
use Amp\Promise;
use Amp\Success;
class Parser implements WritableStream {
const CHUNK_SIZE = 8192;
class Parser implements OutputStream {
/** @var \Generator */
private $generator;
@ -55,7 +53,7 @@ class Parser implements WritableStream {
* @return string
*/
public function cancel(): string {
$this->generator = null;
$this->close();
return $this->buffer;
}
@ -138,4 +136,11 @@ class Parser implements WritableStream {
}
}
}
}
/**
* @inheritdoc
*/
public function close() {
$this->generator = null;
}
}

View File

@ -0,0 +1,17 @@
<?php
namespace Amp\ByteStream;
use Throwable;
/**
* Thrown in case a second read operation is attempted while another read operation is still pending.
*/
class PendingReadException extends StreamException {
public function __construct(
$message = "The previous read operation must complete before read can be called again",
$code = 0,
Throwable $previous = null
) {
parent::__construct($message, $code, $previous);
}
}

View File

@ -1,22 +0,0 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
interface ReadableStream {
/**
* Returns a promise that resolves with a boolean, true if there is another chunk available, false if the stream
* has ended.
*
* @return Promise
*/
public function advance(): Promise;
/**
* Gets the current chunk that arrived on the stream.
*
* @return string
*/
public function getChunk(): string;
}

139
lib/ResourceInputStream.php Normal file
View File

@ -0,0 +1,139 @@
<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Failure;
use Amp\Loop;
use Amp\Promise;
class ResourceInputStream implements InputStream {
const DEFAULT_CHUNK_SIZE = 8192;
/** @var resource */
private $resource;
/** @var string */
private $watcher;
/** @var \Amp\Deferred|null */
private $deferred;
/** @var bool */
private $readable = true;
/** @var bool */
private $autoClose = true;
public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE, $autoClose = true) {
if (!is_resource($stream) || get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}
$meta = \stream_get_meta_data($stream);
if (isset($meta["mode"]) && $meta["mode"] !== ""
&& strpos($meta["mode"], "r") === false
&& strpos($meta["mode"], "+") === false
) {
throw new \Error("Expected a readable stream");
}
\stream_set_blocking($stream, false);
\stream_set_read_buffer($stream, 0);
$this->resource = $stream;
$this->autoClose = $autoClose;
$deferred = &$this->deferred;
$readable = &$this->readable;
$this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use (
&$deferred, &$readable, $chunkSize
) {
if ($deferred === null) {
return;
}
// Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes.
$data = @\fread($stream, $chunkSize);
if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) {
$readable = false;
Loop::cancel($watcher);
$data = null; // Stream closed, resolve read with null.
}
$temp = $deferred;
$deferred = null;
$temp->resolve($data);
if ($deferred === null) { // Only disable watcher if no further read was requested.
Loop::disable($watcher);
}
});
Loop::disable($this->watcher);
}
/**
* Reads data from the stream.
*
* @return Promise Resolves with a string when new data is available or `null` if the stream has closed.
*
* @throws PendingReadException Thrown if another read operation is still pending.
*/
public function read(): Promise {
if ($this->deferred !== null) {
throw new PendingReadException;
}
if (!$this->readable) {
return new Failure(new ClosedException("The stream has been closed"));
}
$this->deferred = new Deferred;
Loop::enable($this->watcher);
return $this->deferred->promise();
}
/**
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you
* want to allow half-closed duplex streams, you must use different objects for input and output.
*
* @return void
*/
public function close() {
if ($this->resource === null) {
return;
}
if ($this->autoClose && \is_resource($this->resource)) {
@\fclose($this->resource);
}
$this->resource = null;
$this->readable = false;
if ($this->deferred !== null) {
$deferred = $this->deferred;
$this->deferred = null;
$deferred->resolve(null);
}
Loop::cancel($this->watcher);
}
public function getResource() {
return $this->resource;
}
public function __destruct() {
if ($this->autoClose) {
$this->close();
}
}
}

View File

@ -0,0 +1,218 @@
<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Failure;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
class ResourceOutputStream implements OutputStream {
/** @var resource */
private $resource;
/** @var string */
private $watcher;
/** @var \SplQueue */
private $writes;
/** @var bool */
private $writable = true;
/** @var bool */
private $autoClose = true;
public function __construct($stream, int $chunkSize = 8192, bool $autoClose = true) {
if (!is_resource($stream) || get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}
$meta = \stream_get_meta_data($stream);
if (isset($meta["mode"]) && $meta["mode"] === "r") {
throw new \Error("Expected a writable stream");
}
\stream_set_blocking($stream, false);
\stream_set_write_buffer($stream, 0);
$this->resource = $stream;
$this->autoClose = $autoClose;
$writes = $this->writes = new \SplQueue;
$writable = &$this->writable;
$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, &$writable) {
try {
while (!$writes->isEmpty()) {
/** @var \Amp\Deferred $deferred */
list($data, $previous, $deferred) = $writes->shift();
$length = \strlen($data);
if ($length === 0) {
$deferred->resolve(0);
continue;
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
$written = @\fwrite($stream, $data);
if ($written === false || $written === 0) {
$writable = false;
$message = "Failed to write to socket";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
$exception = new \Exception($message);
$deferred->fail($exception);
while (!$writes->isEmpty()) {
list(, , $deferred) = $writes->shift();
$deferred->fail($exception);
}
return;
}
if ($length <= $written) {
$deferred->resolve($written + $previous);
continue;
}
$data = \substr($data, $written);
$writes->unshift([$data, $written + $previous, $deferred]);
return;
}
} finally {
if ($writes->isEmpty()) {
Loop::disable($watcher);
}
}
});
Loop::disable($this->watcher);
}
/**
* Writes data to the stream.
*
* @param string $data Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
public function write(string $data): Promise {
return $this->send($data, false);
}
/**
* Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before.
*
* @param string $finalData Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
public function end(string $finalData = ""): Promise {
return $this->send($finalData, true);
}
/**
* @param string $data
* @param bool $end
*
* @return Promise
*/
private function send(string $data, bool $end = false): Promise {
if (!$this->writable) {
return new Failure(new \Exception("The stream is not writable"));
}
$length = \strlen($data);
$written = 0;
if ($end) {
$this->writable = false;
}
if ($this->writes->isEmpty()) {
if ($length === 0) {
if ($end) {
$this->close();
}
return new Success(0);
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
$written = @\fwrite($this->resource, $data);
if ($written === false) {
$message = "Failed to write to stream";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
return new Failure(new StreamException($message));
}
if ($length === $written) {
if ($end) {
$this->close();
}
return new Success($written);
}
$data = \substr($data, $written);
}
$deferred = new Deferred;
$this->writes->push([$data, $written, $deferred]);
Loop::enable($this->watcher);
$promise = $deferred->promise();
if ($end) {
$promise->onResolve([$this, 'close']);
}
return $promise;
}
/**
* @inheritdoc
*/
public function close() {
if ($this->resource !== null) {
return;
}
if (\is_resource($this->resource)) {
@\fclose($this->resource);
}
$this->resource = null;
$this->writable = false;
if (!$this->writes->isEmpty()) {
$exception = new ClosedException("The socket was closed before writing completed");
do {
/** @var \Amp\Deferred $deferred */
list(, , $deferred) = $this->writes->shift();
$deferred->fail($exception);
} while (!$this->writes->isEmpty());
}
Loop::cancel($this->watcher);
}
public function getResource() {
return $this->resource;
}
public function __destruct() {
if ($this->autoClose) {
$this->close();
}
}
}

View File

@ -1,21 +0,0 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
interface WritableStream {
/**
* @param string $data
*
* @return \Amp\Promise Succeeds once the data has been successfully written to the stream.
*/
public function write(string $data): Promise;
/**
* @param string $data
*
* @return \Amp\Promise Succeeds once the data has been successfully written to the stream and the stream ended.
*/
public function end(string $data = ""): Promise;
}

View File

@ -2,8 +2,8 @@
namespace Amp\ByteStream;
use Amp\Coroutine;
use Amp\Promise;
use function Amp\call;
// @codeCoverageIgnoreStart
if (\strlen('…') !== 3) {
@ -13,11 +13,20 @@ if (\strlen('…') !== 3) {
} // @codeCoverageIgnoreEnd
/**
* @param \Amp\ByteStream\ReadableStream $source
* @param \Amp\ByteStream\WritableStream $destination
* @param \Amp\ByteStream\InputStream $source
* @param \Amp\ByteStream\OutputStream $destination
*
* @return \Amp\Promise
*/
function pipe(ReadableStream $source, WritableStream $destination): Promise {
return new Coroutine(Internal\pipe($source, $destination));
}
function pipe(InputStream $source, OutputStream $destination): Promise {
return call(function () use ($source, $destination): \Generator {
$written = 0;
while (($chunk = yield $source->read()) !== null) {
$written += \strlen($chunk);
yield $destination->write($chunk);
}
return $written;
});
}

View File

@ -2,7 +2,7 @@
namespace Amp\ByteStream\Test;
use Amp\ByteStream\Message;
use Amp\ByteStream\IteratorStream;
use Amp\Emitter;
use Amp\Loop;
use Amp\PHPUnit\TestCase;
@ -14,7 +14,7 @@ class MessageTest extends TestCase {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$message = new Message($emitter->stream());
$message = new IteratorStream($emitter->stream());
foreach ($values as $value) {
$emitter->emit($value);
@ -33,7 +33,7 @@ class MessageTest extends TestCase {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$message = new Message($emitter->stream());
$message = new IteratorStream($emitter->stream());
foreach ($values as $value) {
$emitter->emit($value);
@ -58,7 +58,7 @@ class MessageTest extends TestCase {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$message = new Message($emitter->stream());
$message = new IteratorStream($emitter->stream());
foreach ($values as $value) {
$emitter->emit($value);
@ -80,7 +80,7 @@ class MessageTest extends TestCase {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$message = new Message($emitter->stream());
$message = new IteratorStream($emitter->stream());
foreach ($values as $value) {
$emitter->emit($value);
@ -105,7 +105,7 @@ class MessageTest extends TestCase {
$value = "abc";
$emitter = new Emitter;
$message = new Message($emitter->stream());
$message = new IteratorStream($emitter->stream());
$emitter->emit($value);
$emitter->fail($exception);
@ -123,7 +123,7 @@ class MessageTest extends TestCase {
public function testEmptyStream() {
Loop::run(function () {
$value = 1;
$message = new Message(new Success($value));
$message = new IteratorStream(new Success($value));
$this->assertFalse(yield $message->advance());
});
@ -138,7 +138,7 @@ class MessageTest extends TestCase {
$value = "abc";
$emitter = new Emitter;
$message = new Message($emitter->stream());
$message = new IteratorStream($emitter->stream());
$emitter->emit($value);
$emitter->resolve();
@ -158,7 +158,7 @@ class MessageTest extends TestCase {
$value = "abc";
$emitter = new Emitter;
$message = new Message($emitter->stream());
$message = new IteratorStream($emitter->stream());
$emitter->emit($value);
$emitter->resolve();