1
0
mirror of https://github.com/danog/amp.git synced 2024-11-27 04:24:42 +01:00

Better Emitter fix; fail observable if emitted awaitable fails

This commit is contained in:
Aaron Piotrowski 2016-08-16 13:07:38 -05:00
parent a8a5a1b3a5
commit dae4eb90da
2 changed files with 44 additions and 30 deletions

View File

@ -11,27 +11,19 @@ final class Emitter implements Observable {
/** /**
* @param callable(callable(mixed $value): Awaitable $emit): \Generator $emitter * @param callable(callable(mixed $value): Awaitable $emit): \Generator $emitter
*
* @throws \Error Thrown if the callable does not return a Generator.
*/ */
public function __construct(callable $emitter) { public function __construct(callable $emitter) {
$this->init(); $this->init();
// Defer first emit until next tick in order to give *all* subscribers a chance to subscribe first if (PHP_VERSION_ID >= 70100) {
$pending = new Deferred; $emit = \Closure::fromCallable([$this, 'emit']);
Loop::defer(static function () use (&$pending) { } else {
$temp = $pending; $emit = function ($value): Awaitable {
$pending = null; return $this->emit($value);
$temp->resolve(); };
}); }
$emit = function ($value) use (&$pending): Awaitable {
if ($pending !== null) {
return pipe($pending->getAwaitable(), function () use ($value): Awaitable {
return $this->emit($value);
});
}
return $this->emit($value);
};
$result = $emitter($emit); $result = $emitter($emit);
@ -39,14 +31,20 @@ final class Emitter implements Observable {
throw new \Error("The callable did not return a Generator"); throw new \Error("The callable did not return a Generator");
} }
$coroutine = new Coroutine($result); Loop::defer(function () use ($result) {
$coroutine->when(function ($exception, $value): void { $coroutine = new Coroutine($result);
if ($exception) { $coroutine->when(function ($exception, $value) {
$this->fail($exception); if ($this->resolved) {
return; return;
} }
$this->resolve($value); if ($exception) {
$this->fail($exception);
return;
}
$this->resolve($value);
});
}); });
} }
} }

View File

@ -91,12 +91,28 @@ trait Producer {
$value->subscribe(function ($value) { $value->subscribe(function ($value) {
return $this->emit($value); return $this->emit($value);
}); });
return $value;
$value->when(function ($e) {
if ($e) {
$this->fail($e);
}
});
return $value; // Do not emit observable result.
} }
return pipe($value, function ($value) { $deferred = new Deferred;
return $this->emit($value); $value->when(function ($e, $v) use ($deferred) {
if ($e) {
$this->fail($e);
$deferred->fail($e);
return;
}
$deferred->resolve($this->emit($v));
}); });
return $deferred->getAwaitable();
} }
$awaitables = []; $awaitables = [];
@ -120,7 +136,7 @@ trait Producer {
$deferred = new Deferred; $deferred = new Deferred;
$count = \count($awaitables); $count = \count($awaitables);
$f = function ($e) use ($deferred, $value, &$count) { $f = static function ($e) use ($deferred, $value, &$count) {
if ($e) { if ($e) {
Loop::defer(static function () use ($e) { Loop::defer(static function () use ($e) {
throw $e; throw $e;