mirror of
https://github.com/danog/byte-stream.git
synced 2024-11-30 04:19:23 +01:00
Refactor ResourceInputStream for new API
This commit is contained in:
parent
523ce79dab
commit
4b7a537d7b
@ -2,39 +2,40 @@
|
||||
|
||||
namespace Amp\ByteStream;
|
||||
|
||||
use Amp\Emitter;
|
||||
use Amp\Iterator;
|
||||
use Amp\Deferred;
|
||||
use Amp\Failure;
|
||||
use Amp\Loop;
|
||||
use Amp\Promise;
|
||||
use function Amp\call;
|
||||
|
||||
class ResourceInputStream implements InputStream {
|
||||
const DEFAULT_CHUNK_SIZE = 8192;
|
||||
|
||||
/** @var resource */
|
||||
private $resource;
|
||||
|
||||
/** @var string */
|
||||
private $watcher;
|
||||
|
||||
/** @var Emitter */
|
||||
private $emitter;
|
||||
/** @var \Amp\Deferred|null */
|
||||
private $deferred;
|
||||
|
||||
/** @var Iterator */
|
||||
private $iterator;
|
||||
/** @var bool */
|
||||
private $readable = true;
|
||||
|
||||
/** @var bool */
|
||||
private $autoClose = true;
|
||||
|
||||
/** @var Promise|null */
|
||||
private $readOperation;
|
||||
|
||||
public function __construct($stream, int $chunkSize = 8192, $autoClose = true) {
|
||||
public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE, $autoClose = true) {
|
||||
if (!is_resource($stream) || get_resource_type($stream) !== 'stream') {
|
||||
throw new \Error("Expected a valid stream");
|
||||
}
|
||||
|
||||
$meta = \stream_get_meta_data($stream);
|
||||
|
||||
if (isset($meta["mode"]) && $meta["mode"] !== "" && strpos($meta["mode"], "r") === false && strpos($meta["mode"], "+") === false) {
|
||||
if (isset($meta["mode"]) && $meta["mode"] !== ""
|
||||
&& strpos($meta["mode"], "r") === false
|
||||
&& strpos($meta["mode"], "+") === false
|
||||
) {
|
||||
throw new \Error("Expected a readable stream");
|
||||
}
|
||||
|
||||
@ -42,32 +43,37 @@ class ResourceInputStream implements InputStream {
|
||||
\stream_set_read_buffer($stream, 0);
|
||||
|
||||
$this->resource = $stream;
|
||||
$this->emitter = new Emitter;
|
||||
$this->iterator = $this->emitter->getIterator();
|
||||
$this->autoClose = $autoClose;
|
||||
|
||||
$emitter = &$this->emitter;
|
||||
$deferred = &$this->deferred;
|
||||
$readable = &$this->readable;
|
||||
|
||||
$this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use (
|
||||
&$deferred, &$readable, $chunkSize
|
||||
) {
|
||||
if ($deferred === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use (&$emitter, $chunkSize) {
|
||||
// Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes.
|
||||
$data = @\fread($stream, $chunkSize);
|
||||
|
||||
if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) {
|
||||
$readable = false;
|
||||
Loop::cancel($watcher);
|
||||
$temp = $emitter;
|
||||
$emitter = null;
|
||||
$temp->complete();
|
||||
return;
|
||||
$data = null; // Stream closed, resolve read with null.
|
||||
}
|
||||
|
||||
Loop::disable($watcher);
|
||||
$temp = $deferred;
|
||||
$deferred = null;
|
||||
$temp->resolve($data);
|
||||
|
||||
$emitter->emit($data)->onResolve(function ($exception) use (&$emitter, $watcher) {
|
||||
if ($emitter !== null && $exception === null) {
|
||||
Loop::enable($watcher);
|
||||
}
|
||||
});
|
||||
if ($deferred === null) { // Only disable watcher if no further read was requested.
|
||||
Loop::disable($watcher);
|
||||
}
|
||||
});
|
||||
|
||||
Loop::disable($this->watcher);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -78,22 +84,18 @@ class ResourceInputStream implements InputStream {
|
||||
* @throws PendingReadException Thrown if another read operation is still pending.
|
||||
*/
|
||||
public function read(): Promise {
|
||||
if ($this->readOperation !== null) {
|
||||
if ($this->deferred !== null) {
|
||||
throw new PendingReadException;
|
||||
}
|
||||
|
||||
if (!$this->readable) {
|
||||
return new Failure(new ClosedException("The stream has been closed"));
|
||||
}
|
||||
|
||||
$this->deferred = new Deferred;
|
||||
Loop::enable($this->watcher);
|
||||
|
||||
$this->readOperation = call(function () {
|
||||
if (yield $this->emitter->getIterator()->advance()) {
|
||||
$this->readOperation = null;
|
||||
return $this->emitter->getIterator()->getCurrent();
|
||||
}
|
||||
|
||||
throw new ClosedException("The stream has been closed");
|
||||
});
|
||||
|
||||
return $this->readOperation;
|
||||
return $this->deferred->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -114,11 +116,12 @@ class ResourceInputStream implements InputStream {
|
||||
}
|
||||
|
||||
$this->resource = null;
|
||||
$this->readable = false;
|
||||
|
||||
if ($this->emitter !== null) {
|
||||
$temp = $this->emitter;
|
||||
$this->emitter = null;
|
||||
$temp->complete();
|
||||
if ($this->deferred !== null) {
|
||||
$deferred = $this->deferred;
|
||||
$this->deferred = null;
|
||||
$deferred->resolve(null);
|
||||
}
|
||||
|
||||
Loop::cancel($this->watcher);
|
||||
|
Loading…
Reference in New Issue
Block a user