2017-04-30 08:31:53 +02:00
< ? php
namespace Amp\ByteStream ;
use Amp\Deferred ;
use Amp\Failure ;
use Amp\Loop ;
use Amp\Promise ;
use Amp\Success ;
2017-05-25 18:12:12 +02:00
/**
* Output stream abstraction for PHP ' s stream resources .
*/
2018-09-21 22:45:13 +02:00
final class ResourceOutputStream implements OutputStream
{
2018-04-03 18:57:25 +02:00
const MAX_CONSECUTIVE_EMPTY_WRITES = 3 ;
2018-05-23 19:39:34 +02:00
const LARGE_CHUNK_SIZE = 128 * 1024 ;
2018-04-03 18:57:25 +02:00
2017-04-30 08:31:53 +02:00
/** @var resource */
private $resource ;
/** @var string */
private $watcher ;
/** @var \SplQueue */
private $writes ;
/** @var bool */
private $writable = true ;
2017-05-25 18:12:12 +02:00
/** @var int|null */
private $chunkSize ;
/**
2017-05-29 17:34:29 +02:00
* @ param resource $stream Stream resource .
2017-05-25 18:12:12 +02:00
* @ param int | null $chunkSize Chunk size per `fwrite()` operation .
*/
2018-09-21 22:45:13 +02:00
public function __construct ( $stream , int $chunkSize = null )
{
2017-05-12 01:08:45 +02:00
if ( ! \is_resource ( $stream ) || \get_resource_type ( $stream ) !== 'stream' ) {
2017-04-30 08:31:53 +02:00
throw new \Error ( " Expected a valid stream " );
}
$meta = \stream_get_meta_data ( $stream );
2017-06-18 22:55:44 +02:00
if ( \strpos ( $meta [ " mode " ], " r " ) !== false && \strpos ( $meta [ " mode " ], " + " ) === false ) {
2017-04-30 08:31:53 +02:00
throw new \Error ( " Expected a writable stream " );
}
\stream_set_blocking ( $stream , false );
\stream_set_write_buffer ( $stream , 0 );
$this -> resource = $stream ;
2019-03-31 16:51:36 +02:00
$this -> chunkSize = & $chunkSize ;
2017-04-30 08:31:53 +02:00
$writes = $this -> writes = new \SplQueue ;
$writable = & $this -> writable ;
2017-05-12 01:08:45 +02:00
$resource = & $this -> resource ;
2017-04-30 08:31:53 +02:00
2019-03-31 16:51:36 +02:00
$this -> watcher = Loop :: onWritable ( $stream , static function ( $watcher , $stream ) use ( $writes , & $chunkSize , & $writable , & $resource ) {
2018-04-03 18:57:25 +02:00
static $emptyWrites = 0 ;
2017-04-30 08:31:53 +02:00
try {
while ( ! $writes -> isEmpty ()) {
/** @var \Amp\Deferred $deferred */
list ( $data , $previous , $deferred ) = $writes -> shift ();
$length = \strlen ( $data );
if ( $length === 0 ) {
$deferred -> resolve ( 0 );
continue ;
}
2018-04-04 01:51:02 +02:00
if ( ! \is_resource ( $stream ) || @ \feof ( $stream )) {
throw new StreamException ( " The stream was closed by the peer " );
}
2017-04-30 08:31:53 +02:00
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
2017-07-15 09:50:46 +02:00
// Use conditional, because PHP doesn't like getting null passed
2017-06-18 18:47:58 +02:00
if ( $chunkSize ) {
$written = @ \fwrite ( $stream , $data , $chunkSize );
} else {
$written = @ \fwrite ( $stream , $data );
}
2017-04-30 08:31:53 +02:00
2017-07-15 09:50:46 +02:00
\assert ( $written !== false , " Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to. " );
2018-04-04 01:51:02 +02:00
// Broken pipes between processes on macOS/FreeBSD do not detect EOF properly.
if ( $written === 0 ) {
if ( $emptyWrites ++ > self :: MAX_CONSECUTIVE_EMPTY_WRITES ) {
2018-04-05 16:44:39 +02:00
$message = " Failed to write to stream after multiple attempts " ;
2018-04-04 01:51:02 +02:00
if ( $error = \error_get_last ()) {
$message .= \sprintf ( " ; %s " , $error [ " message " ]);
}
2019-03-11 20:12:45 +01:00
throw new StreamException ( $message );
2017-04-30 08:31:53 +02:00
}
2017-05-12 01:08:45 +02:00
2018-04-04 01:51:02 +02:00
$writes -> unshift ([ $data , $previous , $deferred ]);
2017-04-30 08:31:53 +02:00
return ;
}
2018-04-03 18:57:25 +02:00
$emptyWrites = 0 ;
2017-07-15 09:50:46 +02:00
if ( $length > $written ) {
$data = \substr ( $data , $written );
$writes -> unshift ([ $data , $written + $previous , $deferred ]);
return ;
2017-04-30 08:31:53 +02:00
}
2017-07-15 09:50:46 +02:00
$deferred -> resolve ( $written + $previous );
2017-04-30 08:31:53 +02:00
}
2018-04-04 01:51:02 +02:00
} catch ( \Throwable $exception ) {
$resource = null ;
$writable = false ;
$deferred -> fail ( $exception );
while ( ! $writes -> isEmpty ()) {
list (, , $deferred ) = $writes -> shift ();
$deferred -> fail ( $exception );
}
Loop :: cancel ( $watcher );
2017-04-30 08:31:53 +02:00
} finally {
if ( $writes -> isEmpty ()) {
Loop :: disable ( $watcher );
}
}
});
Loop :: disable ( $this -> watcher );
}
/**
* Writes data to the stream .
*
* @ param string $data Bytes to write .
*
* @ return Promise Succeeds once the data has been successfully written to the stream .
*
* @ throws ClosedException If the stream has already been closed .
*/
2018-09-21 22:45:13 +02:00
public function write ( string $data ) : Promise
{
2017-04-30 08:31:53 +02:00
return $this -> send ( $data , false );
}
/**
* Closes the stream after all pending writes have been completed . Optionally writes a final data chunk before .
*
* @ param string $finalData Bytes to write .
*
* @ return Promise Succeeds once the data has been successfully written to the stream .
*
* @ throws ClosedException If the stream has already been closed .
*/
2018-09-21 22:45:13 +02:00
public function end ( string $finalData = " " ) : Promise
{
2017-04-30 08:31:53 +02:00
return $this -> send ( $finalData , true );
}
2018-09-21 22:45:13 +02:00
private function send ( string $data , bool $end = false ) : Promise
{
2017-09-15 07:17:42 +02:00
if ( ! $this -> writable ) {
2018-02-07 13:49:07 +01:00
return new Failure ( new ClosedException ( " The stream is not writable " ));
2017-04-30 08:31:53 +02:00
}
$length = \strlen ( $data );
$written = 0 ;
if ( $end ) {
$this -> writable = false ;
}
if ( $this -> writes -> isEmpty ()) {
if ( $length === 0 ) {
if ( $end ) {
$this -> close ();
}
return new Success ( 0 );
}
2018-04-04 01:51:02 +02:00
if ( ! \is_resource ( $this -> resource ) || @ \feof ( $this -> resource )) {
return new Failure ( new StreamException ( " The stream was closed by the peer " ));
}
2017-04-30 08:31:53 +02:00
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
2017-05-29 17:34:29 +02:00
// Use conditional, because PHP doesn't like getting null passed.
if ( $this -> chunkSize ) {
$written = @ \fwrite ( $this -> resource , $data , $this -> chunkSize );
} else {
$written = @ \fwrite ( $this -> resource , $data );
}
2017-04-30 08:31:53 +02:00
2017-07-15 09:50:46 +02:00
\assert ( $written !== false , " Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to. " );
2017-04-30 08:31:53 +02:00
2017-05-05 15:42:18 +02:00
if ( $length === $written ) {
2017-04-30 08:31:53 +02:00
if ( $end ) {
$this -> close ();
}
return new Success ( $written );
}
$data = \substr ( $data , $written );
}
$deferred = new Deferred ;
2018-05-23 19:39:34 +02:00
if ( $length - $written > self :: LARGE_CHUNK_SIZE ) {
$chunks = \str_split ( $data , self :: LARGE_CHUNK_SIZE );
$data = \array_pop ( $chunks );
foreach ( $chunks as $chunk ) {
$this -> writes -> push ([ $chunk , $written , new Deferred ]);
$written += self :: LARGE_CHUNK_SIZE ;
}
}
2017-04-30 08:31:53 +02:00
$this -> writes -> push ([ $data , $written , $deferred ]);
Loop :: enable ( $this -> watcher );
$promise = $deferred -> promise ();
if ( $end ) {
2017-06-19 06:14:59 +02:00
$promise -> onResolve ([ $this , " close " ]);
2017-04-30 08:31:53 +02:00
}
return $promise ;
}
/**
2017-05-12 01:08:45 +02:00
* Closes the stream forcefully . Multiple `close()` calls are ignored .
*
* @ return void
2017-04-30 08:31:53 +02:00
*/
2018-09-21 22:45:13 +02:00
public function close ()
{
2017-06-19 06:14:59 +02:00
if ( $this -> resource ) {
2017-06-19 08:43:07 +02:00
// Error suppression, as resource might already be closed
$meta = @ \stream_get_meta_data ( $this -> resource );
2017-06-18 22:57:19 +02:00
2017-06-19 08:43:07 +02:00
if ( $meta && \strpos ( $meta [ " mode " ], " + " ) !== false ) {
2017-12-10 17:40:58 +01:00
@ \stream_socket_shutdown ( $this -> resource , \STREAM_SHUT_WR );
2017-06-18 22:57:19 +02:00
} else {
2017-06-19 08:43:07 +02:00
@ \fclose ( $this -> resource );
2017-06-18 22:57:19 +02:00
}
2017-06-18 20:16:05 +02:00
}
2017-06-19 06:14:59 +02:00
$this -> free ();
}
/**
* Nulls reference to resource , marks stream unwritable , and fails any pending write .
*/
2018-09-21 22:45:13 +02:00
private function free ()
{
2017-04-30 08:31:53 +02:00
$this -> resource = null ;
$this -> writable = false ;
if ( ! $this -> writes -> isEmpty ()) {
$exception = new ClosedException ( " The socket was closed before writing completed " );
do {
/** @var \Amp\Deferred $deferred */
list (, , $deferred ) = $this -> writes -> shift ();
$deferred -> fail ( $exception );
} while ( ! $this -> writes -> isEmpty ());
}
Loop :: cancel ( $this -> watcher );
}
2017-05-12 01:08:45 +02:00
/**
* @ return resource | null Stream resource or null if end () has been called or the stream closed .
*/
2018-09-21 22:45:13 +02:00
public function getResource ()
{
2017-05-05 16:45:53 +02:00
return $this -> resource ;
}
2017-05-12 17:27:58 +02:00
2019-03-31 16:51:36 +02:00
public function setChunkSize ( int $chunkSize )
{
$this -> chunkSize = $chunkSize ;
}
2018-09-21 22:45:13 +02:00
public function __destruct ()
{
2017-05-12 17:27:58 +02:00
if ( $this -> resource !== null ) {
2017-06-19 06:14:59 +02:00
$this -> free ();
2017-05-12 17:27:58 +02:00
}
}
2017-05-07 22:19:55 +02:00
}