1
0
mirror of https://github.com/danog/amp.git synced 2025-01-21 21:01:16 +01:00

Prevent emit without subscribers

This commit is contained in:
Aaron Piotrowski 2016-05-31 23:02:59 -05:00
parent eb49e6e8ff
commit cc431a0374
4 changed files with 43 additions and 7 deletions

View File

@ -7,6 +7,7 @@ namespace Amp;
*/
final class Emitter implements Observable {
use Internal\Producer {
init as __construct;
emit as public;
resolve as public;
fail as public;

View File

@ -14,6 +14,8 @@ final class PrivateObservable implements Observable {
* @param callable(callable $emit, callable $complete, callable $fail): void $emitter
*/
public function __construct(callable $emitter) {
$this->init();
/**
* Emits a value from the observable.
*

View File

@ -9,6 +9,7 @@ use Amp\Observable;
use Amp\Subscriber;
use Amp\Success;
use Interop\Async\Awaitable;
use Interop\Async\Loop;
trait Producer {
use Placeholder {
@ -20,6 +21,11 @@ trait Producer {
*/
private $subscribers = [];
/**
* @var \Amp\Future|null
*/
private $waiting;
/**
* @var \Amp\Future[]
*/
@ -35,18 +41,23 @@ trait Producer {
*/
private $dispose;
/**
* Initializes the trait. Use as constructor or call within using class constructor.
*/
public function init()
{
$this->waiting = new Future;
$this->dispose = function ($id, $exception = null) {
$this->dispose($id, $exception);
};
}
/**
* @param callable $onNext
*
* @return \Amp\Subscriber
*/
public function subscribe(callable $onNext) {
if ($this->dispose === null) {
$this->dispose = function ($id, $exception = null) {
$this->dispose($id, $exception);
};
}
if ($this->result !== null) {
return new Subscriber(
$this->nextId++,
@ -59,6 +70,12 @@ trait Producer {
$this->futures[$id] = new Future;
$this->subscribers[$id] = $onNext;
if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve();
}
return new Subscriber($id, $this->futures[$id], $this->dispose);
}
@ -73,7 +90,12 @@ trait Producer {
$future = $this->futures[$id];
unset($this->subscribers[$id], $this->futures[$id]);
$future->fail($exception ?: new DisposedException());
if (empty($this->subscribers)) {
$this->waiting = new Future;
}
$future->fail($exception ?: new DisposedException);
}
/**
@ -105,6 +127,10 @@ trait Producer {
* @throws \Throwable|\Exception
*/
private function push($value) {
while ($this->waiting !== null) {
yield $this->waiting;
}
try {
if ($value instanceof Observable) {
$disposable = $value->subscribe(function ($value) {
@ -168,6 +194,12 @@ trait Producer {
$futures = $this->futures;
$this->subscribers = $this->futures = [];
if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve();
}
$this->complete($value);
foreach ($futures as $future) {

View File

@ -7,6 +7,7 @@ try {
production: // PHP 7 production environment (zend.assertions=0)
final class Postponed implements Observable {
use Internal\Producer {
init as __construct;
emit as public;
resolve as public;
fail as public;