mirror of
https://github.com/danog/byte-stream.git
synced 2024-12-02 09:17:50 +01:00
Respect chunk size for ResourceOutputStream, document resource streams
This commit is contained in:
parent
a27cd0c3db
commit
2ecbf8aa8b
@ -1 +1,3 @@
|
|||||||
# Resource Streams
|
# Resource Streams
|
||||||
|
|
||||||
|
This package abstracts PHP's stream resources with `ResourceInputStream` and `ResourceOutputStream`. They automatically set the passed resource to non-blocking and allow reading and writing like any other `InputStream` / `OutputStream`. They also handle backpressure automatically by disabling the read watcher in case there's no read request and only activate a write watcher if the underlying write buffer is already full, which makes them very efficient.
|
||||||
|
@ -7,6 +7,9 @@ use Amp\Loop;
|
|||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Success;
|
use Amp\Success;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Input stream abstraction for PHP's stream resources.
|
||||||
|
*/
|
||||||
final class ResourceInputStream implements InputStream {
|
final class ResourceInputStream implements InputStream {
|
||||||
const DEFAULT_CHUNK_SIZE = 8192;
|
const DEFAULT_CHUNK_SIZE = 8192;
|
||||||
|
|
||||||
@ -22,6 +25,10 @@ final class ResourceInputStream implements InputStream {
|
|||||||
/** @var bool */
|
/** @var bool */
|
||||||
private $readable = true;
|
private $readable = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param resource $stream Stream resource.
|
||||||
|
* @param int $chunkSize Chunk size per `fread()` operation.
|
||||||
|
*/
|
||||||
public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE) {
|
public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE) {
|
||||||
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
|
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
|
||||||
throw new \Error("Expected a valid stream");
|
throw new \Error("Expected a valid stream");
|
||||||
@ -74,13 +81,7 @@ final class ResourceInputStream implements InputStream {
|
|||||||
Loop::disable($this->watcher);
|
Loop::disable($this->watcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** @inheritdoc */
|
||||||
* Reads data from the stream.
|
|
||||||
*
|
|
||||||
* @return Promise Resolves with a string when new data is available or `null` if the stream has closed.
|
|
||||||
*
|
|
||||||
* @throws PendingReadError Thrown if another read operation is still pending.
|
|
||||||
*/
|
|
||||||
public function read(): Promise {
|
public function read(): Promise {
|
||||||
if ($this->deferred !== null) {
|
if ($this->deferred !== null) {
|
||||||
throw new PendingReadError;
|
throw new PendingReadError;
|
||||||
@ -99,6 +100,9 @@ final class ResourceInputStream implements InputStream {
|
|||||||
/**
|
/**
|
||||||
* Closes the stream forcefully. Multiple `close()` calls are ignored.
|
* Closes the stream forcefully. Multiple `close()` calls are ignored.
|
||||||
*
|
*
|
||||||
|
* This does only free the resource internally, the underlying file descriptor isn't closed. This is left to PHP's
|
||||||
|
* garbage collection system.
|
||||||
|
*
|
||||||
* @return void
|
* @return void
|
||||||
*/
|
*/
|
||||||
public function close() {
|
public function close() {
|
||||||
|
@ -8,6 +8,9 @@ use Amp\Loop;
|
|||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Success;
|
use Amp\Success;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Output stream abstraction for PHP's stream resources.
|
||||||
|
*/
|
||||||
final class ResourceOutputStream implements OutputStream {
|
final class ResourceOutputStream implements OutputStream {
|
||||||
/** @var resource */
|
/** @var resource */
|
||||||
private $resource;
|
private $resource;
|
||||||
@ -21,7 +24,14 @@ final class ResourceOutputStream implements OutputStream {
|
|||||||
/** @var bool */
|
/** @var bool */
|
||||||
private $writable = true;
|
private $writable = true;
|
||||||
|
|
||||||
public function __construct($stream, int $chunkSize = 8192) {
|
/** @var int|null */
|
||||||
|
private $chunkSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param $stream Stream resource.
|
||||||
|
* @param int|null $chunkSize Chunk size per `fwrite()` operation.
|
||||||
|
*/
|
||||||
|
public function __construct($stream, int $chunkSize = null) {
|
||||||
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
|
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
|
||||||
throw new \Error("Expected a valid stream");
|
throw new \Error("Expected a valid stream");
|
||||||
}
|
}
|
||||||
@ -39,12 +49,13 @@ final class ResourceOutputStream implements OutputStream {
|
|||||||
\stream_set_write_buffer($stream, 0);
|
\stream_set_write_buffer($stream, 0);
|
||||||
|
|
||||||
$this->resource = $stream;
|
$this->resource = $stream;
|
||||||
|
$this->chunkSize = $chunkSize;
|
||||||
|
|
||||||
$writes = $this->writes = new \SplQueue;
|
$writes = $this->writes = new \SplQueue;
|
||||||
$writable = &$this->writable;
|
$writable = &$this->writable;
|
||||||
$resource = &$this->resource;
|
$resource = &$this->resource;
|
||||||
|
|
||||||
$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, &$writable, &$resource) {
|
$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, $chunkSize, &$writable, &$resource) {
|
||||||
try {
|
try {
|
||||||
while (!$writes->isEmpty()) {
|
while (!$writes->isEmpty()) {
|
||||||
/** @var \Amp\Deferred $deferred */
|
/** @var \Amp\Deferred $deferred */
|
||||||
@ -57,7 +68,7 @@ final class ResourceOutputStream implements OutputStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
|
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
|
||||||
$written = @\fwrite($stream, $data);
|
$written = @\fwrite($stream, $data, $chunkSize);
|
||||||
|
|
||||||
if ($written === false || $written === 0) {
|
if ($written === false || $written === 0) {
|
||||||
$writable = false;
|
$writable = false;
|
||||||
@ -123,12 +134,6 @@ final class ResourceOutputStream implements OutputStream {
|
|||||||
return $this->send($finalData, true);
|
return $this->send($finalData, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param string $data
|
|
||||||
* @param bool $end
|
|
||||||
*
|
|
||||||
* @return Promise
|
|
||||||
*/
|
|
||||||
private function send(string $data, bool $end = false): Promise {
|
private function send(string $data, bool $end = false): Promise {
|
||||||
if ($this->resource === null) {
|
if ($this->resource === null) {
|
||||||
return new Failure(new StreamException("The stream is not writable"));
|
return new Failure(new StreamException("The stream is not writable"));
|
||||||
@ -150,7 +155,7 @@ final class ResourceOutputStream implements OutputStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
|
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
|
||||||
$written = @\fwrite($this->resource, $data);
|
$written = @\fwrite($this->resource, $data, $this->chunkSize);
|
||||||
|
|
||||||
if ($written === false) {
|
if ($written === false) {
|
||||||
$message = "Failed to write to stream";
|
$message = "Failed to write to stream";
|
||||||
|
Loading…
Reference in New Issue
Block a user