1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-11-26 20:04:51 +01:00
byte-stream/lib/ResourceOutputStream.php

289 lines
9.0 KiB
PHP
Raw Permalink Normal View History

<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Failure;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
/**
* Output stream abstraction for PHP's stream resources.
*/
2018-09-21 22:45:13 +02:00
final class ResourceOutputStream implements OutputStream
{
const MAX_CONSECUTIVE_EMPTY_WRITES = 3;
const LARGE_CHUNK_SIZE = 128 * 1024;
/** @var resource */
private $resource;
/** @var string */
private $watcher;
/** @var \SplQueue */
private $writes;
/** @var bool */
private $writable = true;
/** @var int|null */
private $chunkSize;
/**
* @param resource $stream Stream resource.
* @param int|null $chunkSize Chunk size per `fwrite()` operation.
*/
2018-09-21 22:45:13 +02:00
public function __construct($stream, int $chunkSize = null)
{
2017-05-12 01:08:45 +02:00
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}
$meta = \stream_get_meta_data($stream);
if (\strpos($meta["mode"], "r") !== false && \strpos($meta["mode"], "+") === false) {
throw new \Error("Expected a writable stream");
}
\stream_set_blocking($stream, false);
\stream_set_write_buffer($stream, 0);
$this->resource = $stream;
$this->chunkSize = &$chunkSize;
$writes = $this->writes = new \SplQueue;
$writable = &$this->writable;
2017-05-12 01:08:45 +02:00
$resource = &$this->resource;
$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, &$chunkSize, &$writable, &$resource) {
static $emptyWrites = 0;
try {
while (!$writes->isEmpty()) {
/** @var \Amp\Deferred $deferred */
list($data, $previous, $deferred) = $writes->shift();
$length = \strlen($data);
if ($length === 0) {
$deferred->resolve(0);
continue;
}
if (!\is_resource($stream) || (($metaData = @\stream_get_meta_data($stream)) && $metaData['eof'])) {
throw new ClosedException("The stream was closed by the peer");
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
// Use conditional, because PHP doesn't like getting null passed
if ($chunkSize) {
$written = @\fwrite($stream, $data, $chunkSize);
} else {
$written = @\fwrite($stream, $data);
}
\assert($written !== false, "Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");
// Broken pipes between processes on macOS/FreeBSD do not detect EOF properly.
if ($written === 0) {
if ($emptyWrites++ > self::MAX_CONSECUTIVE_EMPTY_WRITES) {
$message = "Failed to write to stream after multiple attempts";
if ($error = \error_get_last()) {
$message .= \sprintf("; %s", $error["message"]);
}
throw new StreamException($message);
}
2017-05-12 01:08:45 +02:00
$writes->unshift([$data, $previous, $deferred]);
return;
}
$emptyWrites = 0;
if ($length > $written) {
$data = \substr($data, $written);
$writes->unshift([$data, $written + $previous, $deferred]);
return;
}
$deferred->resolve($written + $previous);
}
} catch (\Throwable $exception) {
$resource = null;
$writable = false;
$deferred->fail($exception);
while (!$writes->isEmpty()) {
list(, , $deferred) = $writes->shift();
$deferred->fail($exception);
}
Loop::cancel($watcher);
} finally {
if ($writes->isEmpty()) {
Loop::disable($watcher);
}
}
});
Loop::disable($this->watcher);
}
/**
* Writes data to the stream.
*
* @param string $data Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
2018-09-21 22:45:13 +02:00
public function write(string $data): Promise
{
return $this->send($data, false);
}
/**
* Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before.
*
* @param string $finalData Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
2018-09-21 22:45:13 +02:00
public function end(string $finalData = ""): Promise
{
return $this->send($finalData, true);
}
2018-09-21 22:45:13 +02:00
private function send(string $data, bool $end = false): Promise
{
if (!$this->writable) {
return new Failure(new ClosedException("The stream is not writable"));
}
$length = \strlen($data);
$written = 0;
if ($end) {
$this->writable = false;
}
if ($this->writes->isEmpty()) {
if ($length === 0) {
if ($end) {
$this->close();
}
return new Success(0);
}
if (!\is_resource($this->resource) || (($metaData = @\stream_get_meta_data($this->resource)) && $metaData['eof'])) {
return new Failure(new ClosedException("The stream was closed by the peer"));
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
// Use conditional, because PHP doesn't like getting null passed.
if ($this->chunkSize) {
$written = @\fwrite($this->resource, $data, $this->chunkSize);
} else {
$written = @\fwrite($this->resource, $data);
}
\assert($written !== false, "Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");
if ($length === $written) {
if ($end) {
$this->close();
}
return new Success($written);
}
$data = \substr($data, $written);
}
$deferred = new Deferred;
if ($length - $written > self::LARGE_CHUNK_SIZE) {
$chunks = \str_split($data, self::LARGE_CHUNK_SIZE);
$data = \array_pop($chunks);
foreach ($chunks as $chunk) {
$this->writes->push([$chunk, $written, new Deferred]);
$written += self::LARGE_CHUNK_SIZE;
}
}
$this->writes->push([$data, $written, $deferred]);
Loop::enable($this->watcher);
$promise = $deferred->promise();
if ($end) {
$promise->onResolve([$this, "close"]);
}
return $promise;
}
/**
2017-05-12 01:08:45 +02:00
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* @return void
*/
2018-09-21 22:45:13 +02:00
public function close()
{
if ($this->resource) {
// Error suppression, as resource might already be closed
$meta = @\stream_get_meta_data($this->resource);
if ($meta && \strpos($meta["mode"], "+") !== false) {
@\stream_socket_shutdown($this->resource, \STREAM_SHUT_WR);
} else {
@\fclose($this->resource);
}
}
$this->free();
}
/**
* Nulls reference to resource, marks stream unwritable, and fails any pending write.
*/
2018-09-21 22:45:13 +02:00
private function free()
{
$this->resource = null;
$this->writable = false;
if (!$this->writes->isEmpty()) {
$exception = new ClosedException("The socket was closed before writing completed");
do {
/** @var \Amp\Deferred $deferred */
list(, , $deferred) = $this->writes->shift();
$deferred->fail($exception);
} while (!$this->writes->isEmpty());
}
Loop::cancel($this->watcher);
}
2017-05-12 01:08:45 +02:00
/**
* @return resource|null Stream resource or null if end() has been called or the stream closed.
*/
2018-09-21 22:45:13 +02:00
public function getResource()
{
2017-05-05 16:45:53 +02:00
return $this->resource;
}
public function setChunkSize(int $chunkSize)
{
$this->chunkSize = $chunkSize;
}
2018-09-21 22:45:13 +02:00
public function __destruct()
{
if ($this->resource !== null) {
$this->free();
}
}
2017-05-07 22:19:55 +02:00
}