1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-12-12 09:29:57 +01:00
byte-stream/lib/Payload.php

93 lines
2.4 KiB
PHP
Raw Normal View History

<?php
namespace Amp\ByteStream;
use Amp\Coroutine;
use Amp\Promise;
use function Amp\call;
/**
* Creates a buffered message from an InputStream. The message can be consumed in chunks using the read() API or it may
* be buffered and accessed in its entirety by calling buffer(). Once buffering is requested through buffer(), the
* stream cannot be read in chunks. On destruct any remaining data is read from the InputStream given to this class.
*/
2018-09-21 22:45:13 +02:00
class Payload implements InputStream
{
/** @var InputStream */
private $stream;
/** @var \Amp\Promise|null */
private $promise;
/** @var \Amp\Promise|null */
private $lastRead;
/**
* @param \Amp\ByteStream\InputStream $stream
*/
2018-09-21 22:45:13 +02:00
public function __construct(InputStream $stream)
{
$this->stream = $stream;
}
2018-09-21 22:45:13 +02:00
public function __destruct()
{
if (!$this->promise) {
Promise\rethrow(new Coroutine($this->consume()));
}
}
2018-09-21 22:45:13 +02:00
private function consume(): \Generator
{
try {
if ($this->lastRead && null === yield $this->lastRead) {
return;
}
while (null !== yield $this->stream->read()) {
// Discard unread bytes from message.
}
} catch (\Throwable $exception) {
// If exception is thrown here the connection closed anyway.
}
}
/**
* @inheritdoc
*
* @throws \Error If a buffered message was requested by calling buffer().
*/
2018-09-21 22:45:13 +02:00
final public function read(): Promise
{
if ($this->promise) {
throw new \Error("Cannot stream message data once a buffered message has been requested");
}
return $this->lastRead = $this->stream->read();
}
/**
* Buffers the entire message and resolves the returned promise then.
*
* @return Promise<string> Resolves with the entire message contents.
*/
2018-09-21 22:45:13 +02:00
final public function buffer(): Promise
{
if ($this->promise) {
return $this->promise;
}
return $this->promise = call(function () {
$buffer = '';
if ($this->lastRead && null === yield $this->lastRead) {
return $buffer;
}
while (null !== $chunk = yield $this->stream->read()) {
$buffer .= $chunk;
}
return $buffer;
});
}
}