diff --git a/lib/Emitter.php b/lib/Emitter.php index b39c192..edcc545 100644 --- a/lib/Emitter.php +++ b/lib/Emitter.php @@ -11,42 +11,40 @@ final class Emitter implements Observable { /** * @param callable(callable(mixed $value): Awaitable $emit): \Generator $emitter + * + * @throws \Error Thrown if the callable does not return a Generator. */ public function __construct(callable $emitter) { $this->init(); - // Defer first emit until next tick in order to give *all* subscribers a chance to subscribe first - $pending = new Deferred; - Loop::defer(static function () use (&$pending) { - $temp = $pending; - $pending = null; - $temp->resolve(); - }); - - $emit = function ($value) use (&$pending): Awaitable { - if ($pending !== null) { - return pipe($pending->getAwaitable(), function () use ($value): Awaitable { - return $this->emit($value); - }); - } - - return $this->emit($value); - }; + if (PHP_VERSION_ID >= 70100) { + $emit = \Closure::fromCallable([$this, 'emit']); + } else { + $emit = function ($value): Awaitable { + return $this->emit($value); + }; + } $result = $emitter($emit); if (!$result instanceof \Generator) { throw new \Error("The callable did not return a Generator"); } - - $coroutine = new Coroutine($result); - $coroutine->when(function ($exception, $value): void { - if ($exception) { - $this->fail($exception); - return; - } - - $this->resolve($value); + + Loop::defer(function () use ($result) { + $coroutine = new Coroutine($result); + $coroutine->when(function ($exception, $value) { + if ($this->resolved) { + return; + } + + if ($exception) { + $this->fail($exception); + return; + } + + $this->resolve($value); + }); }); } } diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index 7b7e5df..bc4b2d0 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -91,12 +91,28 @@ trait Producer { $value->subscribe(function ($value) { return $this->emit($value); }); - return $value; + + $value->when(function ($e) { + if ($e) { + $this->fail($e); + } + }); + + return $value; // Do not emit observable result. } - return pipe($value, function ($value) { - return $this->emit($value); + $deferred = new Deferred; + $value->when(function ($e, $v) use ($deferred) { + if ($e) { + $this->fail($e); + $deferred->fail($e); + return; + } + + $deferred->resolve($this->emit($v)); }); + + return $deferred->getAwaitable(); } $awaitables = []; @@ -120,7 +136,7 @@ trait Producer { $deferred = new Deferred; $count = \count($awaitables); - $f = function ($e) use ($deferred, $value, &$count) { + $f = static function ($e) use ($deferred, $value, &$count) { if ($e) { Loop::defer(static function () use ($e) { throw $e;