resolved) { return; } $this->subscribers[] = $onNext; } /** * Emits a value from the observable. The returned promise is resolved with the emitted value once all subscribers * have been invoked. * * @param mixed $value * * @return \Interop\Async\Promise * * @throws \Error If the observable has resolved. */ private function emit($value): Promise { if ($this->resolved) { throw new \Error("The observable has been resolved; cannot emit more values"); } if ($value instanceof Promise) { $deferred = new Deferred; $value->when(function ($e, $v) use ($deferred) { if ($this->resolved) { $deferred->fail( new \Error("The observable was resolved before the promise result could be emitted") ); return; } if ($e) { $this->fail($e); $deferred->fail($e); return; } $deferred->resolve($this->emit($v)); }); return $deferred->promise(); } $promises = []; foreach ($this->subscribers as $onNext) { try { $result = $onNext($value); if ($result instanceof Promise) { $promises[] = $result; } } catch (\Throwable $e) { ErrorHandler::notify($e); } } if (!$promises) { return new Success($value); } $deferred = new Deferred; $count = \count($promises); $f = static function ($e) use ($deferred, $value, &$count) { if ($e) { ErrorHandler::notify($e); } if (!--$count) { $deferred->resolve($value); } }; foreach ($promises as $promise) { $promise->when($f); } return $deferred->promise(); } /** * Resolves the observable with the given value. * * @param mixed $value * * @throws \Error If the observable has already been resolved. */ private function resolve($value = null) { $this->complete($value); $this->subscribers = []; } }