waiting = new Future; $this->unsubscribe = function ($id, $exception = null) { $this->unsubscribe($id, $exception); }; } /** * @param callable $onNext * * @return \Amp\Subscriber */ public function subscribe(callable $onNext) { if ($this->result !== null) { return new Subscriber( $this->nextId++, $this->result instanceof Awaitable ? $this->result : new Success($this->result), $this->unsubscribe ); } $id = $this->nextId++; $this->futures[$id] = $future = new Future; $this->subscribers[$id] = $onNext; if ($this->waiting !== null) { $waiting = $this->waiting; $this->waiting = null; $waiting->resolve(); } return new Subscriber($id, $future, $this->unsubscribe); } /** * @param string $id * @param \Throwable|\Exception|null $exception */ protected function unsubscribe($id, $exception = null) { if (!isset($this->futures[$id])) { return; } $future = $this->futures[$id]; unset($this->subscribers[$id], $this->futures[$id]); if (empty($this->subscribers)) { $this->waiting = new Future; } $future->fail($exception ?: new UnsubscribedException); } /** * Emits a value from the observable. The returned awaitable is resolved with the emitted value once all subscribers * have been invoked. * * @param mixed $value * * @return \Interop\Async\Awaitable * * @throws \LogicException If the observable has resolved. */ protected function emit($value = null) { if ($this->resolved) { throw new \LogicException("The observable has been resolved; cannot emit more values"); } return new Coroutine($this->push($value)); } /** * @coroutine * * @param mixed $value * * @return \Generator * * @throws \InvalidArgumentException * @throws \Throwable|\Exception */ private function push($value) { while ($this->waiting !== null) { yield $this->waiting; } try { if ($value instanceof Observable) { $subscriber = $value->subscribe(function ($value) { return $this->emit($value); }); yield Coroutine::result(yield $subscriber); return; } if ($value instanceof Awaitable) { $value = (yield $value); } } catch (\Throwable $exception) { if (!$this->resolved) { $this->fail($exception); } throw $exception; } catch (\Exception $exception) { if (!$this->resolved) { $this->fail($exception); } throw $exception; } $awaitables = []; foreach ($this->subscribers as $id => $onNext) { try { $result = $onNext($value); if ($result instanceof Awaitable) { $awaitables[$id] = $result; } } catch (\Throwable $exception) { $this->unsubscribe($id, $exception); } catch (\Exception $exception) { $this->unsubscribe($id, $exception); } } foreach ($awaitables as $id => $awaitable) { try { yield $awaitable; } catch (\Throwable $exception) { $this->unsubscribe($id, $exception); } catch (\Exception $exception) { $this->unsubscribe($id, $exception); } } yield Coroutine::result($value); } /** * Resolves the observable with the given value. * * @param mixed $value * * @throws \LogicException If the observable has already been resolved. */ protected function resolve($value = null) { $futures = $this->futures; $this->subscribers = $this->futures = []; if ($this->waiting !== null) { $waiting = $this->waiting; $this->waiting = null; $waiting->resolve(); } $this->complete($value); foreach ($futures as $future) { $future->resolve($value); } } }