1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-11-30 04:19:23 +01:00
byte-stream/lib/ResourceOutputStream.php

226 lines
6.8 KiB
PHP
Raw Normal View History

<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Failure;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
/**
* Output stream abstraction for PHP's stream resources.
*/
2017-05-22 14:38:39 +02:00
final class ResourceOutputStream implements OutputStream {
/** @var resource */
private $resource;
/** @var string */
private $watcher;
/** @var \SplQueue */
private $writes;
/** @var bool */
private $writable = true;
/** @var int|null */
private $chunkSize;
/**
* @param $stream Stream resource.
* @param int|null $chunkSize Chunk size per `fwrite()` operation.
*/
public function __construct($stream, int $chunkSize = null) {
2017-05-12 01:08:45 +02:00
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}
$meta = \stream_get_meta_data($stream);
if (isset($meta["mode"])
&& \strpos($meta["mode"], "r") !== false
&& \strpos($meta["mode"], "+") === false
) {
throw new \Error("Expected a writable stream");
}
\stream_set_blocking($stream, false);
\stream_set_write_buffer($stream, 0);
$this->resource = $stream;
$this->chunkSize = $chunkSize;
$writes = $this->writes = new \SplQueue;
$writable = &$this->writable;
2017-05-12 01:08:45 +02:00
$resource = &$this->resource;
$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, $chunkSize, &$writable, &$resource) {
try {
while (!$writes->isEmpty()) {
/** @var \Amp\Deferred $deferred */
list($data, $previous, $deferred) = $writes->shift();
$length = \strlen($data);
if ($length === 0) {
$deferred->resolve(0);
continue;
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
$written = @\fwrite($stream, $data, $chunkSize);
if ($written === false || $written === 0) {
$writable = false;
2017-05-12 01:08:45 +02:00
$resource = null;
$message = "Failed to write to socket";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
2017-05-12 01:08:45 +02:00
$exception = new StreamException($message);
$deferred->fail($exception);
while (!$writes->isEmpty()) {
list(, , $deferred) = $writes->shift();
$deferred->fail($exception);
}
2017-05-12 01:08:45 +02:00
Loop::cancel($watcher);
return;
}
if ($length <= $written) {
$deferred->resolve($written + $previous);
continue;
}
$data = \substr($data, $written);
$writes->unshift([$data, $written + $previous, $deferred]);
return;
}
} finally {
if ($writes->isEmpty()) {
Loop::disable($watcher);
}
}
});
Loop::disable($this->watcher);
}
/**
* Writes data to the stream.
*
* @param string $data Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
public function write(string $data): Promise {
return $this->send($data, false);
}
/**
* Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before.
*
* @param string $finalData Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
public function end(string $finalData = ""): Promise {
return $this->send($finalData, true);
}
private function send(string $data, bool $end = false): Promise {
2017-05-12 01:08:45 +02:00
if ($this->resource === null) {
return new Failure(new StreamException("The stream is not writable"));
}
$length = \strlen($data);
$written = 0;
if ($end) {
$this->writable = false;
}
if ($this->writes->isEmpty()) {
if ($length === 0) {
if ($end) {
$this->close();
}
return new Success(0);
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
$written = @\fwrite($this->resource, $data, $this->chunkSize);
if ($written === false) {
$message = "Failed to write to stream";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
return new Failure(new StreamException($message));
}
if ($length === $written) {
if ($end) {
$this->close();
}
return new Success($written);
}
$data = \substr($data, $written);
}
$deferred = new Deferred;
$this->writes->push([$data, $written, $deferred]);
Loop::enable($this->watcher);
$promise = $deferred->promise();
if ($end) {
2017-05-12 01:08:45 +02:00
$promise->onResolve(function () {
$this->close();
});
}
return $promise;
}
/**
2017-05-12 01:08:45 +02:00
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* @return void
*/
2017-05-12 06:52:15 +02:00
public function close() {
$this->resource = null;
$this->writable = false;
if (!$this->writes->isEmpty()) {
$exception = new ClosedException("The socket was closed before writing completed");
do {
/** @var \Amp\Deferred $deferred */
list(, , $deferred) = $this->writes->shift();
$deferred->fail($exception);
} while (!$this->writes->isEmpty());
}
Loop::cancel($this->watcher);
}
2017-05-12 01:08:45 +02:00
/**
* @return resource|null Stream resource or null if end() has been called or the stream closed.
*/
2017-05-05 16:45:53 +02:00
public function getResource() {
return $this->resource;
}
public function __destruct() {
if ($this->resource !== null) {
$this->close();
}
}
2017-05-07 22:19:55 +02:00
}