1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-12-11 17:09:43 +01:00
byte-stream/lib/IteratorStream.php

64 lines
1.5 KiB
PHP
Raw Normal View History

<?php
namespace Amp\ByteStream;
use Amp\Deferred;
2017-05-22 10:41:35 +02:00
use Amp\Failure;
use Amp\Iterator;
use Amp\Promise;
2018-09-21 22:45:13 +02:00
final class IteratorStream implements InputStream
{
private $iterator;
2017-05-22 10:41:35 +02:00
private $exception;
private $pending = false;
2018-09-21 22:45:13 +02:00
public function __construct(Iterator $iterator)
{
$this->iterator = $iterator;
}
/** @inheritdoc */
2018-09-21 22:45:13 +02:00
public function read(): Promise
{
2017-05-22 10:41:35 +02:00
if ($this->exception) {
return new Failure($this->exception);
}
if ($this->pending) {
throw new PendingReadError;
}
$this->pending = true;
$deferred = new Deferred;
$this->iterator->advance()->onResolve(function ($error, $hasNextElement) use ($deferred) {
$this->pending = false;
if ($error) {
2017-05-22 10:41:35 +02:00
$this->exception = $error;
$deferred->fail($error);
} elseif ($hasNextElement) {
2017-05-22 10:41:35 +02:00
$chunk = $this->iterator->getCurrent();
if (!\is_string($chunk)) {
$this->exception = new StreamException(\sprintf(
"Unexpected iterator value of type '%s', expected string",
\is_object($chunk) ? \get_class($chunk) : \gettype($chunk)
));
$deferred->fail($this->exception);
return;
}
$deferred->resolve($chunk);
} else {
$deferred->resolve();
}
});
return $deferred->promise();
}
}