1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-11-26 20:04:51 +01:00
byte-stream/lib/ResourceInputStream.php
Niklas Keller 9f60974881 Fix resource input streams
The read watcher needs to be disabled now, because immediate reads will process the next immediate read now in the next tick.

Fixes #38.
2018-03-11 10:19:43 +01:00

226 lines
6.7 KiB
PHP

<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
/**
* Input stream abstraction for PHP's stream resources.
*/
final class ResourceInputStream implements InputStream {
const DEFAULT_CHUNK_SIZE = 8192;
/** @var resource */
private $resource;
/** @var string */
private $watcher;
/** @var \Amp\Deferred|null */
private $deferred;
/** @var bool */
private $readable = true;
/** @var int */
private $chunkSize;
/** @var bool */
private $useSingleRead;
/** @var callable */
private $immediateCallable;
/** @var string */
private $immediateWatcher;
/**
* @param resource $stream Stream resource.
* @param int $chunkSize Chunk size per read operation.
*
* @throws \Error If an invalid stream or parameter has been passed.
*/
public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE) {
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}
$meta = \stream_get_meta_data($stream);
$useSingleRead = $meta["stream_type"] === "udp_socket" || $meta["stream_type"] === "STDIO";
$this->useSingleRead = $useSingleRead;
if (\strpos($meta["mode"], "r") === false && \strpos($meta["mode"], "+") === false) {
throw new \Error("Expected a readable stream");
}
\stream_set_blocking($stream, false);
\stream_set_read_buffer($stream, 0);
$this->resource = $stream;
$this->chunkSize = $chunkSize;
$deferred = &$this->deferred;
$readable = &$this->readable;
$this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use (
&$deferred, &$readable, $chunkSize, $useSingleRead
) {
if ($useSingleRead) {
$data = @\fread($stream, $chunkSize);
} else {
$data = @\stream_get_contents($stream, $chunkSize);
}
\assert($data !== false, "Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");
// Error suppression, because pthreads does crazy things with resources,
// which might be closed during two operations.
// See https://github.com/amphp/byte-stream/issues/32
if ($data === '' && @\feof($stream)) {
$readable = false;
Loop::cancel($watcher);
$data = null; // Stream closed, resolve read with null.
}
Loop::disable($watcher);
$temp = $deferred;
$deferred = null;
$temp->resolve($data);
});
$this->immediateCallable = static function ($watcherId, $data) use (&$deferred) {
$temp = $deferred;
$deferred = null;
$temp->resolve($data);
};
Loop::disable($this->watcher);
}
/** @inheritdoc */
public function read(): Promise {
if ($this->deferred !== null) {
throw new PendingReadError;
}
if (!$this->readable) {
return new Success; // Resolve with null on closed stream.
}
// Attempt a direct read, because Windows suffers from slow I/O on STDIN otherwise.
if ($this->useSingleRead) {
$data = @\fread($this->resource, $this->chunkSize);
} else {
$data = @\stream_get_contents($this->resource, $this->chunkSize);
}
\assert($data !== false, "Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");
if ($data === '') {
// Error suppression, because pthreads does crazy things with resources,
// which might be closed during two operations.
// See https://github.com/amphp/byte-stream/issues/32
if (@\feof($this->resource)) {
$this->readable = false;
Loop::cancel($this->watcher);
$data = null; // Stream closed, resolve read with null.
} else {
$this->deferred = new Deferred;
Loop::enable($this->watcher);
return $this->deferred->promise();
}
}
// Prevent an immediate read → write loop from blocking everything
// See e.g. examples/benchmark-throughput.php
$this->deferred = new Deferred;
$this->immediateWatcher = Loop::defer($this->immediateCallable, $data);
return $this->deferred->promise();
}
/**
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* @return void
*/
public function close() {
if ($this->resource) {
// Error suppression, as resource might already be closed
$meta = @\stream_get_meta_data($this->resource);
if ($meta && \strpos($meta["mode"], "+") !== false) {
@\stream_socket_shutdown($this->resource, \STREAM_SHUT_RD);
} else {
@\fclose($this->resource);
}
$this->resource = null;
}
$this->free();
}
/**
* Nulls reference to resource, marks stream unreadable, and succeeds any pending read with null.
*/
private function free() {
$this->readable = false;
if ($this->deferred !== null) {
$deferred = $this->deferred;
$this->deferred = null;
$deferred->resolve();
}
Loop::cancel($this->watcher);
if ($this->immediateWatcher !== null) {
Loop::cancel($this->immediateWatcher);
}
}
/**
* @return resource|null The stream resource or null if the stream has closed.
*/
public function getResource() {
return $this->resource;
}
/**
* References the read watcher, so the loop keeps running in case there's an active read.
*
* @see Loop::reference()
*/
public function reference() {
if (!$this->resource) {
throw new \Error("Resource has already been freed");
}
Loop::reference($this->watcher);
}
/**
* Unreferences the read watcher, so the loop doesn't keep running even if there are active reads.
*
* @see Loop::unreference()
*/
public function unreference() {
if (!$this->resource) {
throw new \Error("Resource has already been freed");
}
Loop::unreference($this->watcher);
}
public function __destruct() {
if ($this->resource !== null) {
$this->free();
}
}
}