1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-11-30 04:19:23 +01:00
byte-stream/lib/ResourceInputStream.php

207 lines
6.1 KiB
PHP
Raw Normal View History

<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
/**
* Input stream abstraction for PHP's stream resources.
*/
2017-05-22 14:38:39 +02:00
final class ResourceInputStream implements InputStream {
const DEFAULT_CHUNK_SIZE = 8192;
/** @var resource */
private $resource;
/** @var string */
private $watcher;
/** @var \Amp\Deferred|null */
private $deferred;
/** @var bool */
private $readable = true;
/** @var int */
private $chunkSize;
/** @var bool */
private $useSingleRead;
/**
* @param resource $stream Stream resource.
* @param int $chunkSize Chunk size per read operation.
*
* @throws \Error If an invalid stream or parameter has been passed.
*/
2017-05-12 01:08:45 +02:00
public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE) {
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}
$meta = \stream_get_meta_data($stream);
$useSingleRead = $meta["stream_type"] === "udp_socket" || $meta["stream_type"] === "STDIO";
$this->useSingleRead = $useSingleRead;
if (\strpos($meta["mode"], "r") === false && \strpos($meta["mode"], "+") === false) {
throw new \Error("Expected a readable stream");
}
\stream_set_blocking($stream, false);
\stream_set_read_buffer($stream, 0);
$this->resource = $stream;
$this->chunkSize = $chunkSize;
$deferred = &$this->deferred;
$readable = &$this->readable;
$this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use (
&$deferred, &$readable, $chunkSize, $useSingleRead
) {
if ($useSingleRead) {
$data = @\fread($stream, $chunkSize);
} else {
$data = @\stream_get_contents($stream, $chunkSize);
}
\assert($data !== false, "Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");
// Error suppression, because pthreads does crazy things with resources,
// which might be closed during two operations.
// See https://github.com/amphp/byte-stream/issues/32
if ($data === '' && @\feof($stream)) {
$readable = false;
Loop::cancel($watcher);
$data = null; // Stream closed, resolve read with null.
}
$temp = $deferred;
$deferred = null;
$temp->resolve($data);
if ($deferred === null) { // Only disable watcher if no further read was requested.
Loop::disable($watcher);
}
});
Loop::disable($this->watcher);
}
/** @inheritdoc */
public function read(): Promise {
if ($this->deferred !== null) {
throw new PendingReadError;
}
if (!$this->readable) {
return new Success; // Resolve with null on closed stream.
}
// Attempt a direct read, because Windows suffers from slow I/O on STDIN otherwise.
if ($this->useSingleRead) {
$data = @\fread($this->resource, $this->chunkSize);
} else {
$data = @\stream_get_contents($this->resource, $this->chunkSize);
}
\assert($data !== false, "Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");
if ($data === '') {
// Error suppression, because pthreads does crazy things with resources,
// which might be closed during two operations.
// See https://github.com/amphp/byte-stream/issues/32
if (@\feof($this->resource)) {
$this->readable = false;
Loop::cancel($this->watcher);
$data = null; // Stream closed, resolve read with null.
} else {
$this->deferred = new Deferred;
Loop::enable($this->watcher);
return $this->deferred->promise();
}
}
return new Success($data);
}
/**
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* @return void
*/
2017-05-12 06:52:15 +02:00
public function close() {
if ($this->resource) {
// Error suppression, as resource might already be closed
$meta = @\stream_get_meta_data($this->resource);
2017-06-18 23:25:22 +02:00
if ($meta && \strpos($meta["mode"], "+") !== false) {
@\stream_socket_shutdown($this->resource, \STREAM_SHUT_RD);
2017-06-18 23:25:22 +02:00
} else {
@\fclose($this->resource);
2017-06-18 23:25:22 +02:00
}
$this->resource = null;
2017-06-18 23:25:22 +02:00
}
$this->free();
}
/**
* Nulls reference to resource, marks stream unreadable, and succeeds any pending read with null.
*/
private function free() {
$this->readable = false;
if ($this->deferred !== null) {
$deferred = $this->deferred;
$this->deferred = null;
$deferred->resolve();
}
Loop::cancel($this->watcher);
}
2017-05-12 01:08:45 +02:00
/**
* @return resource|null The stream resource or null if the stream has closed.
*/
2017-05-05 16:45:53 +02:00
public function getResource() {
return $this->resource;
}
/**
* References the read watcher, so the loop keeps running in case there's an active read.
*
* @see Loop::reference()
*/
public function reference() {
if (!$this->resource) {
throw new \Error("Resource has already been freed");
}
Loop::reference($this->watcher);
}
/**
* Unreferences the read watcher, so the loop doesn't keep running even if there are active reads.
*
* @see Loop::unreference()
*/
public function unreference() {
if (!$this->resource) {
throw new \Error("Resource has already been freed");
}
Loop::unreference($this->watcher);
}
public function __destruct() {
if ($this->resource !== null) {
$this->free();
}
}
2017-05-07 22:14:45 +02:00
}