2017-04-30 08:31:53 +02:00
< ? php
namespace Amp\ByteStream ;
use Amp\Deferred ;
use Amp\Failure ;
use Amp\Loop ;
use Amp\Promise ;
use Amp\Success ;
2017-05-25 18:12:12 +02:00
/**
* Output stream abstraction for PHP ' s stream resources .
*/
2017-05-22 14:38:39 +02:00
final class ResourceOutputStream implements OutputStream {
2017-04-30 08:31:53 +02:00
/** @var resource */
private $resource ;
/** @var string */
private $watcher ;
/** @var \SplQueue */
private $writes ;
/** @var bool */
private $writable = true ;
2017-05-25 18:12:12 +02:00
/** @var int|null */
private $chunkSize ;
/**
2017-05-29 17:34:29 +02:00
* @ param resource $stream Stream resource .
2017-05-25 18:12:12 +02:00
* @ param int | null $chunkSize Chunk size per `fwrite()` operation .
*/
public function __construct ( $stream , int $chunkSize = null ) {
2017-05-12 01:08:45 +02:00
if ( ! \is_resource ( $stream ) || \get_resource_type ( $stream ) !== 'stream' ) {
2017-04-30 08:31:53 +02:00
throw new \Error ( " Expected a valid stream " );
}
$meta = \stream_get_meta_data ( $stream );
2017-06-18 22:55:44 +02:00
if ( \strpos ( $meta [ " mode " ], " r " ) !== false && \strpos ( $meta [ " mode " ], " + " ) === false ) {
2017-04-30 08:31:53 +02:00
throw new \Error ( " Expected a writable stream " );
}
\stream_set_blocking ( $stream , false );
\stream_set_write_buffer ( $stream , 0 );
$this -> resource = $stream ;
2017-05-25 18:12:12 +02:00
$this -> chunkSize = $chunkSize ;
2017-04-30 08:31:53 +02:00
$writes = $this -> writes = new \SplQueue ;
$writable = & $this -> writable ;
2017-05-12 01:08:45 +02:00
$resource = & $this -> resource ;
2017-04-30 08:31:53 +02:00
2017-05-25 18:12:12 +02:00
$this -> watcher = Loop :: onWritable ( $stream , static function ( $watcher , $stream ) use ( $writes , $chunkSize , & $writable , & $resource ) {
2017-07-15 09:50:46 +02:00
$firstWrite = true ;
2017-04-30 08:31:53 +02:00
try {
while ( ! $writes -> isEmpty ()) {
/** @var \Amp\Deferred $deferred */
list ( $data , $previous , $deferred ) = $writes -> shift ();
$length = \strlen ( $data );
if ( $length === 0 ) {
$deferred -> resolve ( 0 );
continue ;
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
2017-07-15 09:50:46 +02:00
// Use conditional, because PHP doesn't like getting null passed
2017-06-18 18:47:58 +02:00
if ( $chunkSize ) {
$written = @ \fwrite ( $stream , $data , $chunkSize );
} else {
$written = @ \fwrite ( $stream , $data );
}
2017-04-30 08:31:53 +02:00
2017-07-15 09:50:46 +02:00
\assert ( $written !== false , " Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to. " );
if ( $written === 0 ) {
// fwrite will also return 0 if the buffer is already full. Let's test it on the next call to this writability callback, this guarantees that the buffer isn't full.
if ( ! $firstWrite ) {
$writes -> unshift ([ $data , $previous , $deferred ]);
return ;
}
2017-09-15 07:17:42 +02:00
$resource = null ;
2017-04-30 08:31:53 +02:00
$writable = false ;
$message = " Failed to write to socket " ;
if ( $error = \error_get_last ()) {
$message .= \sprintf ( " Errno: %d; %s " , $error [ " type " ], $error [ " message " ]);
}
2017-05-12 01:08:45 +02:00
$exception = new StreamException ( $message );
2017-04-30 08:31:53 +02:00
$deferred -> fail ( $exception );
while ( ! $writes -> isEmpty ()) {
list (, , $deferred ) = $writes -> shift ();
$deferred -> fail ( $exception );
}
2017-05-12 01:08:45 +02:00
Loop :: cancel ( $watcher );
2017-04-30 08:31:53 +02:00
return ;
}
2017-07-15 09:50:46 +02:00
if ( $length > $written ) {
$data = \substr ( $data , $written );
$writes -> unshift ([ $data , $written + $previous , $deferred ]);
return ;
2017-04-30 08:31:53 +02:00
}
2017-07-15 09:50:46 +02:00
$deferred -> resolve ( $written + $previous );
$firstWrite = false ;
2017-04-30 08:31:53 +02:00
}
} finally {
if ( $writes -> isEmpty ()) {
Loop :: disable ( $watcher );
}
}
});
Loop :: disable ( $this -> watcher );
}
/**
* Writes data to the stream .
*
* @ param string $data Bytes to write .
*
* @ return Promise Succeeds once the data has been successfully written to the stream .
*
* @ throws ClosedException If the stream has already been closed .
*/
public function write ( string $data ) : Promise {
return $this -> send ( $data , false );
}
/**
* Closes the stream after all pending writes have been completed . Optionally writes a final data chunk before .
*
* @ param string $finalData Bytes to write .
*
* @ return Promise Succeeds once the data has been successfully written to the stream .
*
* @ throws ClosedException If the stream has already been closed .
*/
public function end ( string $finalData = " " ) : Promise {
return $this -> send ( $finalData , true );
}
private function send ( string $data , bool $end = false ) : Promise {
2017-09-15 07:17:42 +02:00
if ( ! $this -> writable ) {
2017-05-12 01:08:45 +02:00
return new Failure ( new StreamException ( " The stream is not writable " ));
2017-04-30 08:31:53 +02:00
}
$length = \strlen ( $data );
$written = 0 ;
if ( $end ) {
$this -> writable = false ;
}
if ( $this -> writes -> isEmpty ()) {
if ( $length === 0 ) {
if ( $end ) {
$this -> close ();
}
return new Success ( 0 );
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
2017-05-29 17:34:29 +02:00
// Use conditional, because PHP doesn't like getting null passed.
if ( $this -> chunkSize ) {
$written = @ \fwrite ( $this -> resource , $data , $this -> chunkSize );
} else {
$written = @ \fwrite ( $this -> resource , $data );
}
2017-04-30 08:31:53 +02:00
2017-07-15 09:50:46 +02:00
\assert ( $written !== false , " Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to. " );
2017-04-30 08:31:53 +02:00
2017-05-05 15:42:18 +02:00
if ( $length === $written ) {
2017-04-30 08:31:53 +02:00
if ( $end ) {
$this -> close ();
}
return new Success ( $written );
}
$data = \substr ( $data , $written );
}
$deferred = new Deferred ;
$this -> writes -> push ([ $data , $written , $deferred ]);
Loop :: enable ( $this -> watcher );
$promise = $deferred -> promise ();
if ( $end ) {
2017-06-19 06:14:59 +02:00
$promise -> onResolve ([ $this , " close " ]);
2017-04-30 08:31:53 +02:00
}
return $promise ;
}
/**
2017-05-12 01:08:45 +02:00
* Closes the stream forcefully . Multiple `close()` calls are ignored .
*
* @ return void
2017-04-30 08:31:53 +02:00
*/
2017-05-12 06:52:15 +02:00
public function close () {
2017-06-19 06:14:59 +02:00
if ( $this -> resource ) {
2017-06-19 08:43:07 +02:00
// Error suppression, as resource might already be closed
$meta = @ \stream_get_meta_data ( $this -> resource );
2017-06-18 22:57:19 +02:00
2017-06-19 08:43:07 +02:00
if ( $meta && \strpos ( $meta [ " mode " ], " + " ) !== false ) {
2017-06-18 22:57:19 +02:00
\stream_socket_shutdown ( $this -> resource , \STREAM_SHUT_WR );
} else {
2017-06-19 08:43:07 +02:00
@ \fclose ( $this -> resource );
2017-06-18 22:57:19 +02:00
}
2017-06-18 20:16:05 +02:00
}
2017-06-19 06:14:59 +02:00
$this -> free ();
}
/**
* Nulls reference to resource , marks stream unwritable , and fails any pending write .
*/
private function free () {
2017-04-30 08:31:53 +02:00
$this -> resource = null ;
$this -> writable = false ;
if ( ! $this -> writes -> isEmpty ()) {
$exception = new ClosedException ( " The socket was closed before writing completed " );
do {
/** @var \Amp\Deferred $deferred */
list (, , $deferred ) = $this -> writes -> shift ();
$deferred -> fail ( $exception );
} while ( ! $this -> writes -> isEmpty ());
}
Loop :: cancel ( $this -> watcher );
}
2017-05-12 01:08:45 +02:00
/**
* @ return resource | null Stream resource or null if end () has been called or the stream closed .
*/
2017-05-05 16:45:53 +02:00
public function getResource () {
return $this -> resource ;
}
2017-05-12 17:27:58 +02:00
public function __destruct () {
if ( $this -> resource !== null ) {
2017-06-19 06:14:59 +02:00
$this -> free ();
2017-05-12 17:27:58 +02:00
}
}
2017-05-07 22:19:55 +02:00
}