1
0
mirror of https://github.com/danog/amp.git synced 2024-12-12 09:29:45 +01:00

Update for Continuation

This commit is contained in:
Aaron Piotrowski 2020-11-05 11:29:31 -06:00
parent cff6bba499
commit f3b189f33f
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
4 changed files with 54 additions and 48 deletions

View File

@ -35,7 +35,7 @@ final class EmitSource
/** @var Deferred[] */ /** @var Deferred[] */
private array $backPressure = []; private array $backPressure = [];
/** @var \Fiber[] */ /** @var \Continuation[] */
private array $waiting = []; private array $waiting = [];
private int $consumePosition = 0; private int $consumePosition = 0;
@ -116,7 +116,10 @@ final class EmitSource
return null; return null;
} }
return \Fiber::suspend(fn(\Fiber $fiber) => $this->waiting[$position] = $fiber, Loop::get()); return \Fiber::suspend(
fn(\Continuation $continuation) => $this->waiting[$position] = $continuation,
Loop::get()
);
} }
public function pipe(): Pipeline public function pipe(): Pipeline
@ -210,9 +213,9 @@ final class EmitSource
$position = $this->emitPosition++; $position = $this->emitPosition++;
if (isset($this->waiting[$position])) { if (isset($this->waiting[$position])) {
$fiber = $this->waiting[$position]; $continuation = $this->waiting[$position];
unset($this->waiting[$position]); unset($this->waiting[$position]);
Loop::defer(fn() => $fiber->resume($value)); Loop::defer(static fn() => $continuation->resume($value));
// Send-values are indexed as $this->consumePosition - 1, so use $position for the next value. // Send-values are indexed as $this->consumePosition - 1, so use $position for the next value.
if (isset($this->sendValues[$position])) { if (isset($this->sendValues[$position])) {
@ -356,11 +359,11 @@ final class EmitSource
} }
} }
foreach ($waiting as $fiber) { foreach ($waiting as $continuation) {
if (isset($this->exception)) { if (isset($this->exception)) {
$fiber->throw($this->exception); $continuation->throw($this->exception);
} else { } else {
$fiber->resume(); $continuation->resume();
} }
} }
} }

View File

@ -25,10 +25,10 @@ namespace Amp
$promise = Promise\all($promise); $promise = Promise\all($promise);
} }
return \Fiber::suspend(static fn(\Fiber $fiber) => $promise->onResolve( return \Fiber::suspend(static fn(\Continuation $continuation) => $promise->onResolve(
static fn(?\Throwable $exception, mixed $value) => $exception static fn(?\Throwable $exception, mixed $value) => $exception
? $fiber->throw($exception) ? $continuation->throw($exception)
: $fiber->resume($value) : $continuation->resume($value)
), Loop::get()); ), Loop::get());
} }

32
stubs/Continuation.php Normal file
View File

@ -0,0 +1,32 @@
<?php
final class Continuation
{
/**
* @return bool True if either {@see resume()} or {@see throw()} has been called previously.
*/
public function continued(): bool { }
/**
* Resumes the fiber, returning the given value from {@see Fiber::suspend()}.
*
* @param mixed $value
*
* @throw FiberError If the continuation has already been used.
*/
public function resume(mixed $value = null): void { }
/**
* Throws the given exception into the fiber from {@see Fiber::suspend()}.
*
* @param Throwable $exception
*
* @throw FiberError If the continuation has already been used.
*/
public function throw(Throwable $exception): void { }
/**
* Cannot be constructed by user code.
*/
private function __construct() { }
}

View File

@ -11,50 +11,21 @@ final class Fiber
public static function run(callable $callback, mixed ...$args): void { } public static function run(callable $callback, mixed ...$args): void { }
/** /**
* Private constructor to force use of {@see run()}. * Suspend execution of the fiber. A Continuation object is provided as the first argument to the given callback.
*/ * The fiber may be resumed with {@see Continuation::resume()} or {@see Continuation::throw()}.
private function __construct() { }
/**
* @return bool True if the fiber is suspended.
*/
public function isSuspended(): bool { }
/**
* @return bool True if the fiber is currently running.
*/
public function isRunning(): bool { }
/**
* @return bool True if the fiber has completed execution.
*/
public function isTerminated(): bool { }
/**
* Resumes the fiber, returning the given value from {@see Fiber::suspend()}.
*
* @param mixed $value
*/
public function resume(mixed $value = null): void { }
/**
* Throws the given exception into the fiber from {@see Fiber::suspend()}.
*
* @param Throwable $exception
*/
public function throw(Throwable $exception): void { }
/**
* Suspend execution of the fiber. The Fiber object is provided as the first argument to the given callback.
* The fiber may be resumed with {@see Fiber::resume()} or {@see Fiber::throw()}.
* *
* @param callable(Fiber):void $enqueue * @param callable(Fiber):void $enqueue
* @param FiberScheduler $scheduler * @param FiberScheduler $scheduler
* *
* @return mixed Value provided to {@see Fiber::resume()}. * @return mixed Value provided to {@see Continuation::resume()}.
* *
* @throws FiberError Thrown if within {@see FiberScheduler::run()}. * @throws FiberError Thrown if within {@see FiberScheduler::run()}.
* @throws Throwable Exception provided to {@see Fiber::throw()}. * @throws Throwable Exception provided to {@see Continuation::throw()}.
*/ */
public static function suspend(callable $enqueue, FiberScheduler $scheduler): mixed { } public static function suspend(callable $enqueue, FiberScheduler $scheduler): mixed { }
/**
* Private constructor to force use of {@see run()}.
*/
private function __construct() { }
} }