mirror of
https://github.com/danog/amp.git
synced 2025-01-22 13:21:16 +01:00
Use fiber directly when emitting values
This commit is contained in:
parent
5d026083df
commit
865238bc16
@ -9,7 +9,7 @@ use Amp\Loop;
|
||||
use Amp\Pipeline;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use function Amp\await;
|
||||
use function Amp\defer;
|
||||
|
||||
/**
|
||||
* Class used internally by {@see Pipeline} implementations. Do not use this class in your code, instead compose your
|
||||
@ -22,16 +22,20 @@ use function Amp\await;
|
||||
*/
|
||||
final class EmitSource
|
||||
{
|
||||
private ?Promise $result = null;
|
||||
|
||||
private bool $completed = false;
|
||||
|
||||
private \Throwable $exception;
|
||||
|
||||
/** @var mixed[] */
|
||||
private array $emittedValues = [];
|
||||
|
||||
/** @var Promise[] */
|
||||
private array $sendValues = [];
|
||||
|
||||
/** @var Deferred[] */
|
||||
private array $backPressure = [];
|
||||
|
||||
/** @var \Fiber[] */
|
||||
private array $waiting = [];
|
||||
|
||||
private int $consumePosition = 0;
|
||||
@ -44,6 +48,7 @@ final class EmitSource
|
||||
|
||||
private bool $used = false;
|
||||
|
||||
/** @var callable[]|null */
|
||||
private ?array $onDisposal = [];
|
||||
|
||||
/**
|
||||
@ -101,18 +106,17 @@ final class EmitSource
|
||||
if (\array_key_exists($position, $this->emittedValues)) {
|
||||
$value = $this->emittedValues[$position];
|
||||
unset($this->emittedValues[$position]);
|
||||
|
||||
// Defer next value to avoid creating a blocking loop.
|
||||
return await(new Success($value));
|
||||
return $value;
|
||||
}
|
||||
|
||||
if ($this->result) {
|
||||
return await($this->result);
|
||||
if ($this->completed || $this->disposed) {
|
||||
if (isset($this->exception)) {
|
||||
throw $this->exception;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
$this->waiting[$position] = $deferred = new Deferred;
|
||||
|
||||
return await($deferred->promise());
|
||||
return \Fiber::suspend(fn(\Fiber $fiber) => $this->waiting[$position] = $fiber, Loop::get());
|
||||
}
|
||||
|
||||
public function pipe(): Pipeline
|
||||
@ -144,11 +148,11 @@ final class EmitSource
|
||||
private function cancel(bool $cancelPending): void
|
||||
{
|
||||
try {
|
||||
if ($this->result) {
|
||||
if ($this->completed || $this->disposed) {
|
||||
return; // Pipeline already completed or failed.
|
||||
}
|
||||
|
||||
$this->finalize(new Failure(new DisposedException), true);
|
||||
$this->finalize(new DisposedException, true);
|
||||
} finally {
|
||||
if ($this->disposed && $cancelPending) {
|
||||
$this->triggerDisposal();
|
||||
@ -166,17 +170,11 @@ final class EmitSource
|
||||
public function onDisposal(callable $onDisposal): void
|
||||
{
|
||||
if ($this->disposed) {
|
||||
try {
|
||||
$onDisposal();
|
||||
} catch (\Throwable $e) {
|
||||
Loop::defer(static function () use ($e) {
|
||||
throw $e;
|
||||
});
|
||||
}
|
||||
defer($onDisposal);
|
||||
return;
|
||||
}
|
||||
|
||||
if ($this->result) {
|
||||
if ($this->completed) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -212,9 +210,9 @@ final class EmitSource
|
||||
$position = $this->emitPosition++;
|
||||
|
||||
if (isset($this->waiting[$position])) {
|
||||
$deferred = $this->waiting[$position];
|
||||
$fiber = $this->waiting[$position];
|
||||
unset($this->waiting[$position]);
|
||||
$deferred->resolve($value);
|
||||
Loop::defer(fn() => $fiber->resume($value));
|
||||
|
||||
// Send-values are indexed as $this->consumePosition - 1, so use $position for the next value.
|
||||
if (isset($this->sendValues[$position])) {
|
||||
@ -222,12 +220,10 @@ final class EmitSource
|
||||
unset($this->sendValues[$position]);
|
||||
return $promise;
|
||||
}
|
||||
} elseif ($this->result) {
|
||||
if ($this->completed) {
|
||||
throw new \Error("Pipelines cannot emit values after calling complete");
|
||||
}
|
||||
|
||||
return $this->result;
|
||||
} elseif ($this->completed) {
|
||||
throw new \Error("Pipelines cannot emit values after calling complete");
|
||||
} elseif (isset($this->exception)) {
|
||||
return new Failure($this->exception);
|
||||
} else {
|
||||
$this->emittedValues[$position] = $value;
|
||||
}
|
||||
@ -270,7 +266,7 @@ final class EmitSource
|
||||
*/
|
||||
public function complete(): void
|
||||
{
|
||||
$this->finalize(new Success);
|
||||
$this->finalize();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -286,16 +282,16 @@ final class EmitSource
|
||||
throw new \Error("Cannot fail a pipeline with an instance of " . DisposedException::class);
|
||||
}
|
||||
|
||||
$this->finalize(new Failure($exception));
|
||||
$this->finalize($exception);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Promise $result
|
||||
* @param bool $disposed Flag if the generator was disposed.
|
||||
* @param \Throwable|null $exception Failure reason or null for success.
|
||||
* @param bool $disposed Flag if the generator was disposed.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function finalize(Promise $result, bool $disposed = false): void
|
||||
private function finalize(?\Throwable $exception = null, bool $disposed = false): void
|
||||
{
|
||||
if ($this->completed) {
|
||||
$message = "Pipeline has already been completed";
|
||||
@ -328,18 +324,20 @@ final class EmitSource
|
||||
})());
|
||||
}
|
||||
|
||||
if ($this->result) {
|
||||
if (isset($this->exception)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->result = $result;
|
||||
if ($exception !== null) {
|
||||
$this->exception = $exception;
|
||||
}
|
||||
|
||||
if ($this->disposed) {
|
||||
if (empty($this->waiting)) {
|
||||
$this->triggerDisposal();
|
||||
}
|
||||
} else {
|
||||
$this->resolvePending();
|
||||
Loop::defer(fn() => $this->resolvePending());
|
||||
}
|
||||
}
|
||||
|
||||
@ -355,11 +353,19 @@ final class EmitSource
|
||||
$this->waiting = [];
|
||||
|
||||
foreach ($backPressure as $deferred) {
|
||||
$deferred->resolve($this->result);
|
||||
if (isset($this->exception)) {
|
||||
$deferred->fail($this->exception);
|
||||
} else {
|
||||
$deferred->resolve();
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($waiting as $deferred) {
|
||||
$deferred->resolve($this->result);
|
||||
foreach ($waiting as $fiber) {
|
||||
if (isset($this->exception)) {
|
||||
$fiber->throw($this->exception);
|
||||
} else {
|
||||
$fiber->resume();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -377,17 +383,11 @@ final class EmitSource
|
||||
$onDisposal = $this->onDisposal;
|
||||
$this->onDisposal = null;
|
||||
|
||||
$this->resolvePending();
|
||||
Loop::defer(fn() => $this->resolvePending());
|
||||
|
||||
/** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */
|
||||
foreach ($onDisposal as $callback) {
|
||||
try {
|
||||
$callback();
|
||||
} catch (\Throwable $e) {
|
||||
Loop::defer(static function () use ($e) {
|
||||
throw $e;
|
||||
});
|
||||
}
|
||||
defer($callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -217,6 +217,8 @@ class PipelineSourceTest extends AsyncTestCase
|
||||
$pipeline = $this->source->pipe();
|
||||
$pipeline->dispose();
|
||||
|
||||
delay(0);
|
||||
|
||||
$this->assertTrue($invoked);
|
||||
|
||||
$this->source->onDisposal($this->createCallback(1));
|
||||
@ -239,6 +241,8 @@ class PipelineSourceTest extends AsyncTestCase
|
||||
$this->assertFalse($invoked);
|
||||
|
||||
$this->source->onDisposal($this->createCallback(0));
|
||||
|
||||
delay(0);
|
||||
}
|
||||
|
||||
public function testEmitAfterDisposal(): void
|
||||
|
Loading…
x
Reference in New Issue
Block a user