1
0
mirror of https://github.com/danog/amp.git synced 2025-01-21 21:01:16 +01:00

Drop disposable interface; rename dispose() to unsubscribe()

This commit is contained in:
Aaron Piotrowski 2016-06-02 10:35:41 -05:00
parent 906d6db47a
commit cef5c90168
8 changed files with 34 additions and 49 deletions

View File

@ -15,12 +15,12 @@ Loop::execute(Amp\coroutine(function () {
$observable = $postponed->getObservable();
$disposable = $observable->subscribe(function ($value) {
$subscriber = $observable->subscribe(function ($value) {
printf("Observable emitted %d\n", $value);
return new Pause(500); // Artificial back-pressure on observable.
});
$disposable->when(function ($exception, $value) {
$subscriber->when(function ($exception, $value) {
if ($exception) {
printf("Observable failed: %s\n", $exception->getMessage());
return;

View File

@ -1,15 +0,0 @@
<?php
namespace Amp;
use Interop\Async\Awaitable;
/**
* Objects returned from \Amp\Observable::subscribe() implement this interface.
*/
interface Disposable extends Awaitable {
/**
* Disposes of the subscriber, failing with an instance of \Amp\DisposedException
*/
public function dispose();
}

View File

@ -1,5 +0,0 @@
<?php
namespace Amp;
class DisposedException extends \RuntimeException {}

View File

@ -3,11 +3,11 @@
namespace Amp\Internal;
use Amp\Coroutine;
use Amp\DisposedException;
use Amp\Future;
use Amp\Observable;
use Amp\Subscriber;
use Amp\Success;
use Amp\UnsubscribedException;
use Interop\Async\Awaitable;
use Interop\Async\Loop;
@ -45,7 +45,7 @@ trait Producer {
/**
* @var callable
*/
private $dispose;
private $unsubscribe;
/**
* Initializes the trait. Use as constructor or call within using class constructor.
@ -53,8 +53,8 @@ trait Producer {
public function init()
{
$this->waiting = new Future;
$this->dispose = function ($id, $exception = null) {
$this->dispose($id, $exception);
$this->unsubscribe = function ($id, $exception = null) {
$this->unsubscribe($id, $exception);
};
}
@ -68,7 +68,7 @@ trait Producer {
return new Subscriber(
$this->nextId++,
$this->result instanceof Awaitable ? $this->result : new Success($this->result),
$this->dispose
$this->unsubscribe
);
}
@ -82,16 +82,16 @@ trait Producer {
$waiting->resolve();
}
return new Subscriber($id, $this->futures[$id], $this->dispose);
return new Subscriber($id, $this->futures[$id], $this->unsubscribe);
}
/**
* @param string $id
* @param \Throwable|\Exception|null $exception
*/
protected function dispose($id, $exception = null) {
protected function unsubscribe($id, $exception = null) {
if (!isset($this->futures[$id])) {
throw new \LogicException("Disposable has already been disposed");
return;
}
$future = $this->futures[$id];
@ -101,7 +101,7 @@ trait Producer {
$this->waiting = new Future;
}
$future->fail($exception ?: new DisposedException);
$future->fail($exception ?: new UnsubscribedException);
}
/**
@ -139,10 +139,10 @@ trait Producer {
try {
if ($value instanceof Observable) {
$disposable = $value->subscribe(function ($value) {
$subscriber = $value->subscribe(function ($value) {
return $this->emit($value);
});
yield Coroutine::result(yield $disposable);
yield Coroutine::result(yield $subscriber);
return;
}
@ -170,9 +170,9 @@ trait Producer {
$awaitables[$id] = $result;
}
} catch (\Throwable $exception) {
$this->dispose($id, $exception);
$this->unsubscribe($id, $exception);
} catch (\Exception $exception) {
$this->dispose($id, $exception);
$this->unsubscribe($id, $exception);
}
}
@ -180,9 +180,9 @@ trait Producer {
try {
yield $awaitable;
} catch (\Throwable $exception) {
$this->dispose($id, $exception);
$this->unsubscribe($id, $exception);
} catch (\Exception $exception) {
$this->dispose($id, $exception);
$this->unsubscribe($id, $exception);
}
}

View File

@ -15,11 +15,11 @@ interface Observable extends Awaitable {
* awaitable, back-pressure is applied to the awaitable until the returned awaitable is resolved.
*
* Exceptions thrown from $onNext (or failures of awaitables returned from $onNext) will fail the returned
* Disposable with the thrown exception.
* Subscriber with the thrown exception.
*
* @param callable $onNext Function invoked each time a value is emitted from the observable.
*
* @return \Amp\Disposable
* @return \Amp\Subscriber
*/
public function subscribe(callable $onNext);
}

View File

@ -14,7 +14,7 @@ namespace Amp;
*/
final class Observer {
/**
* @var \Amp\Disposable
* @var \Amp\Subscriber
*/
private $subscriber;
@ -102,7 +102,7 @@ final class Observer {
*/
public function __destruct() {
if (!$this->resolved) {
$this->subscriber->dispose();
$this->subscriber->unsubscribe();
}
foreach ($this->futures as $future) {

View File

@ -7,7 +7,7 @@ use Interop\Async\Awaitable;
/**
* Disposable implementation returned from implementors of \Amp\Observable.
*/
final class Subscriber implements Disposable {
final class Subscriber implements Awaitable {
/**
* @var string
*/
@ -21,17 +21,17 @@ final class Subscriber implements Disposable {
/**
* @var callable
*/
private $dispose;
private $unsubscribe;
/**
* @param string $id
* @param \Interop\Async\Awaitable $awaitable
* @param callable $dispose
* @param callable $unsubscribe
*/
public function __construct($id, Awaitable $awaitable, callable $dispose) {
public function __construct($id, Awaitable $awaitable, callable $unsubscribe) {
$this->id = $id;
$this->awaitable = $awaitable;
$this->dispose = $dispose;
$this->unsubscribe = $unsubscribe;
}
/**
@ -44,8 +44,8 @@ final class Subscriber implements Disposable {
/**
* {@inheritdoc}
*/
public function dispose() {
$dispose = $this->dispose;
$dispose($this->id);
public function unsubscribe() {
$unsubscribe = $this->unsubscribe;
$unsubscribe($this->id);
}
}

View File

@ -0,0 +1,5 @@
<?php
namespace Amp;
class UnsubscribedException extends \RuntimeException {}