1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-12-12 09:29:57 +01:00
byte-stream/lib/MemoryStream.php

159 lines
4.3 KiB
PHP
Raw Normal View History

2016-12-30 06:09:06 +01:00
<?php
2016-08-16 23:23:46 +02:00
2016-08-10 23:48:42 +02:00
namespace Amp\Stream;
2016-08-16 00:19:32 +02:00
use Amp\{ Deferred, Failure, Success };
2016-11-14 22:05:19 +01:00
use Interop\Async\Promise;
2016-08-10 23:48:42 +02:00
/**
* Serves as buffer that implements the stream interface, allowing consumers to be notified when data is available in
* the buffer. This class by itself is not particularly useful, but it can be extended to add functionality upon reading
* or writing, as well as acting as an example of how stream classes can be implemented.
*/
class MemoryStream implements Stream {
/** @var \Amp\Stream\Buffer */
private $buffer;
/** @var bool */
private $readable = true;
/** @var bool */
private $writable = true;
/** @var \SplQueue */
private $reads;
/**
* @param string $data
*/
2016-08-16 00:19:32 +02:00
public function __construct(string $data = '') {
2016-08-10 23:48:42 +02:00
$this->buffer = new Buffer($data);
$this->reads = new \SplQueue;
}
/**
* {@inheritdoc}
*/
2016-08-16 00:19:32 +02:00
public function isReadable(): bool {
2016-08-10 23:48:42 +02:00
return $this->readable;
}
/**
* {@inheritdoc}
*/
2016-08-16 00:19:32 +02:00
public function isWritable(): bool {
2016-08-10 23:48:42 +02:00
return $this->writable;
}
/**
* {@inheritdoc}
*/
public function close() {
$this->readable = false;
$this->writable = false;
if (!$this->reads->isEmpty()) {
$exception = new ClosedException("The stream was unexpectedly closed");
do {
/** @var \Amp\Deferred $deferred */
list( , , $deferred) = $this->reads->shift();
$deferred->fail($exception);
} while (!$this->reads->isEmpty());
}
}
/**
* {@inheritdoc}
*/
2016-11-14 22:05:19 +01:00
public function read(int $bytes = null, string $delimiter = null): Promise {
2016-08-16 00:19:32 +02:00
if ($bytes !== null && $bytes <= 0) {
throw new \InvalidArgumentException("The number of bytes to read should be a positive integer or null");
2016-08-10 23:48:42 +02:00
}
if (!$this->readable) {
return new Failure(new \LogicException("The stream has been closed"));
}
$deferred = new Deferred;
$this->reads->push([$bytes, $delimiter, $deferred]);
$this->checkPendingReads();
2016-11-14 22:05:19 +01:00
return $deferred->promise();
2016-08-10 23:48:42 +02:00
}
/**
* Returns bytes from the buffer based on the current length or current search byte.
*/
private function checkPendingReads() {
while (!$this->buffer->isEmpty() && !$this->reads->isEmpty()) {
/**
* @var int|null $bytes
* @var string|null $delimiter
* @var \Amp\Deferred $deferred
*/
list($bytes, $delimiter, $deferred) = $this->reads->shift();
if ($delimiter !== null && ($position = $this->buffer->search($delimiter)) !== false) {
$length = $position + \strlen($delimiter);
if ($bytes === null || $length < $bytes) {
$deferred->resolve($this->buffer->shift($length));
continue;
}
}
if ($bytes !== null && $this->buffer->getLength() >= $bytes) {
$deferred->resolve($this->buffer->shift($bytes));
continue;
}
if ($bytes === null) {
$deferred->resolve($this->buffer->drain());
continue;
}
$this->reads->unshift([$bytes, $delimiter, $deferred]);
return;
}
2016-08-16 00:19:32 +02:00
if (!$this->writable && $this->buffer->isEmpty()) {
$this->close();
}
2016-08-10 23:48:42 +02:00
}
/**
* {@inheritdoc}
*/
2016-11-14 22:05:19 +01:00
public function write(string $data): Promise {
2016-08-10 23:48:42 +02:00
return $this->send($data, false);
}
/**
* {@inheritdoc}
*/
2016-11-14 22:05:19 +01:00
public function end(string $data = ''): Promise {
2016-08-10 23:48:42 +02:00
return $this->send($data, true);
}
/**
* @param string $data
* @param bool $end
*
2016-11-14 22:05:19 +01:00
* @return \Interop\Async\Promise
2016-08-10 23:48:42 +02:00
*/
2016-11-14 22:05:19 +01:00
protected function send(string $data, bool $end = false): Promise {
2016-08-10 23:48:42 +02:00
if (!$this->writable) {
return new Failure(new \LogicException("The stream is not writable"));
}
if ($end) {
$this->writable = false;
}
$this->buffer->push($data);
$this->checkPendingReads();
return new Success(\strlen($data));
}
}