1
0
mirror of https://github.com/danog/amp.git synced 2024-12-11 08:59:46 +01:00
amp/lib/Internal/EmitSource.php

316 lines
8.0 KiB
PHP
Raw Normal View History

2020-05-13 17:15:21 +02:00
<?php
namespace Amp\Internal;
use Amp\Deferred;
use Amp\DisposedException;
use Amp\Promise;
use Amp\Stream;
use React\Promise\PromiseInterface as ReactPromise;
/**
2020-05-20 17:59:24 +02:00
* Class used internally by {@see Stream} implementations. Do not use this class in your code, instead compose your
* class from one of the available classes implementing {@see Stream}.
2020-05-13 17:15:21 +02:00
*
* @internal
*
* @template TValue
* @template TSend
*/
2020-05-28 19:59:55 +02:00
final class EmitSource
2020-05-13 17:15:21 +02:00
{
/** @var Promise|null */
private $result;
/** @var bool */
private $completed = false;
/** @var mixed[] */
2020-05-28 19:59:55 +02:00
private $emittedValues = [];
2020-05-13 17:15:21 +02:00
/** @var Promise[] */
private $sendValues = [];
/** @var Deferred[] */
private $backPressure = [];
/** @var Deferred[] */
private $waiting = [];
/** @var int */
private $consumePosition = 0;
/** @var int */
2020-05-28 19:59:55 +02:00
private $emitPosition = 0;
2020-05-13 17:15:21 +02:00
/** @var array|null */
private $resolutionTrace;
/** @var bool */
private $disposed = false;
/** @var bool */
private $used = false;
/**
2020-05-21 17:11:22 +02:00
* @return Promise<mixed|null>
2020-05-16 17:39:34 +02:00
*
2020-05-21 17:11:22 +02:00
* @psalm-return Promise<TValue|null>
2020-05-13 17:15:21 +02:00
*/
public function continue(): Promise
{
return $this->next(Promise\succeed());
2020-05-13 17:15:21 +02:00
}
/**
* @param mixed $value
*
* @psalm-param TSend $value
*
2020-05-21 17:11:22 +02:00
* @return Promise<mixed|null>
2020-05-16 17:39:34 +02:00
*
2020-05-21 17:11:22 +02:00
* @psalm-return Promise<TValue|null>
2020-05-13 17:15:21 +02:00
*/
public function send($value): Promise
{
if ($this->consumePosition === 0) {
throw new \Error("Must initialize async generator by calling continue() first");
}
return $this->next(Promise\succeed($value));
2020-05-13 17:15:21 +02:00
}
/**
* @param \Throwable $exception
*
2020-05-21 17:11:22 +02:00
* @return Promise<mixed|null>
2020-05-16 17:39:34 +02:00
*
2020-05-21 17:11:22 +02:00
* @psalm-return Promise<TValue|null>
2020-05-13 17:15:21 +02:00
*/
public function throw(\Throwable $exception): Promise
{
if ($this->consumePosition === 0) {
throw new \Error("Must initialize async generator by calling continue() first");
}
return $this->next(Promise\fail($exception));
2020-05-13 17:15:21 +02:00
}
/**
2020-05-16 17:39:34 +02:00
* @param Promise<mixed> $promise
*
2020-05-18 20:49:56 +02:00
* @psalm-param Promise<TSend|null> $promise
2020-05-13 17:15:21 +02:00
*
2020-05-21 17:11:22 +02:00
* @return Promise<mixed|null>
2020-05-16 17:39:34 +02:00
*
2020-05-21 17:11:22 +02:00
* @psalm-return Promise<TValue|null>
2020-05-13 17:15:21 +02:00
*/
private function next(Promise $promise): Promise
{
$position = $this->consumePosition++;
if (isset($this->backPressure[$position - 1])) {
$deferred = $this->backPressure[$position - 1];
unset($this->backPressure[$position - 1]);
$deferred->resolve($promise);
} elseif ($position > 0) {
// Send-values are indexed as $this->consumePosition - 1.
$this->sendValues[$position - 1] = $promise;
}
2020-05-28 19:59:55 +02:00
if (\array_key_exists($position, $this->emittedValues)) {
$value = $this->emittedValues[$position];
unset($this->emittedValues[$position]);
2020-05-13 17:15:21 +02:00
return Promise\succeed($value);
2020-05-13 17:15:21 +02:00
}
if ($this->result) {
return $this->result;
}
$this->waiting[$position] = $deferred = new Deferred;
return $deferred->promise();
}
public function stream(): Stream
2020-05-13 17:15:21 +02:00
{
if ($this->used) {
throw new \Error("A stream may be started only once");
}
$this->used = true;
return new AutoDisposingStream($this);
2020-05-13 17:15:21 +02:00
}
/**
* @return void
*/
public function dispose()
2020-05-13 17:15:21 +02:00
{
if ($this->result) {
return; // Stream already completed or failed.
}
2020-05-13 17:15:21 +02:00
$this->finalize(Promise\fail(new DisposedException), true);
2020-05-13 17:15:21 +02:00
}
/**
2020-05-28 19:59:55 +02:00
* Emits a value from the stream. The returned promise is resolved once the emitted value has been consumed or
2020-05-13 17:15:21 +02:00
* if the stream is completed, failed, or disposed.
*
* @param mixed $value
*
* @psalm-param TValue $value
*
2020-05-18 20:49:56 +02:00
* @return Promise<mixed> Resolves with the sent value once the value has been consumed. Fails with the failure
* reason if the {@see fail()} is called, or with {@see DisposedException} if the stream
* is destroyed.
2020-05-13 17:15:21 +02:00
*
2020-05-18 20:49:56 +02:00
* @psalm-return Promise<TSend|null>
2020-05-13 17:15:21 +02:00
*
* @throws \Error If the stream has completed.
*/
2020-05-28 19:59:55 +02:00
public function emit($value): Promise
2020-05-13 17:15:21 +02:00
{
if ($this->result) {
if ($this->disposed) {
return $this->result; // Promise failed with an instance of DisposedException.
}
2020-05-28 19:59:55 +02:00
throw new \Error("Streams cannot emit values after calling complete");
2020-05-13 17:15:21 +02:00
}
2020-05-21 17:11:22 +02:00
if ($value === null) {
2020-05-28 19:59:55 +02:00
throw new \TypeError("Streams cannot emit NULL");
}
2020-05-13 17:15:21 +02:00
if ($value instanceof Promise || $value instanceof ReactPromise) {
2020-05-28 19:59:55 +02:00
throw new \TypeError("Streams cannot emit promises");
2020-05-13 17:15:21 +02:00
}
2020-05-28 19:59:55 +02:00
$position = $this->emitPosition++;
2020-05-13 17:15:21 +02:00
if (isset($this->waiting[$position])) {
$deferred = $this->waiting[$position];
unset($this->waiting[$position]);
2020-05-21 17:11:22 +02:00
$deferred->resolve($value);
2020-05-13 17:15:21 +02:00
// Send-values are indexed as $this->consumePosition - 1, so use $position for the next value.
if (isset($this->sendValues[$position])) {
$promise = $this->sendValues[$position];
unset($this->sendValues[$position]);
return $promise;
}
} else {
2020-05-28 19:59:55 +02:00
$this->emittedValues[$position] = $value;
2020-05-13 17:15:21 +02:00
}
$this->backPressure[$position] = $deferred = new Deferred;
return $deferred->promise();
}
/**
* @return bool True if the stream has been completed or failed.
*/
public function isComplete(): bool
{
return $this->completed;
}
2020-07-17 18:22:13 +02:00
/**
* @return bool True if the stream was disposed.
*/
public function isDisposed(): bool
{
return $this->disposed;
}
2020-05-13 17:15:21 +02:00
/**
* Completes the stream.
**
* @return void
*
* @throws \Error If the iterator has already been completed.
*/
public function complete()
{
$this->finalize(Promise\succeed());
2020-05-13 17:15:21 +02:00
}
/**
* Fails the stream.
*
* @param \Throwable $exception
*
* @return void
*/
public function fail(\Throwable $exception)
{
$this->finalize(Promise\fail($exception));
2020-05-13 17:15:21 +02:00
}
/**
* @param Promise $result Promise with the generator result, either a null success or a failed promise.
* @param bool $disposed Flag if the generator was disposed.
*
* @return void
*/
private function finalize(Promise $result, bool $disposed = false)
{
if ($this->completed) {
$message = "Stream has already been completed";
if (isset($this->resolutionTrace)) {
$trace = formatStacktrace($this->resolutionTrace);
$message .= ". Previous completion trace:\n\n{$trace}\n\n";
} else {
// @codeCoverageIgnoreStart
$message .= ", define environment variable AMP_DEBUG or const AMP_DEBUG = true and enable assertions "
. "for a stacktrace of the previous resolution.";
// @codeCoverageIgnoreEnd
}
throw new \Error($message);
}
$this->completed = !$disposed;
$this->disposed = $disposed;
if ($this->result) {
return;
}
\assert((function () {
2020-07-11 17:13:51 +02:00
if (isDebugEnabled()) {
2020-05-13 17:15:21 +02:00
$trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
\array_shift($trace); // remove current closure
$this->resolutionTrace = $trace;
}
return true;
})());
$this->result = $result;
$waiting = $this->waiting;
$this->waiting = [];
foreach ($waiting as $deferred) {
$deferred->resolve($result);
}
if ($disposed) {
$backPressure = $this->backPressure;
$this->backPressure = [];
foreach ($backPressure as $deferred) {
$deferred->resolve($result);
}
}
}
}