1
0
mirror of https://github.com/danog/amp.git synced 2024-12-02 09:27:46 +01:00

Merge pull request #111 from amphp/onEmit

Rename Stream::listen to Stream::onEmit
This commit is contained in:
Aaron Piotrowski 2017-03-21 14:05:45 -05:00 committed by GitHub
commit 95b3f62497
14 changed files with 47 additions and 47 deletions

View File

@ -14,7 +14,7 @@ Loop::run(function () {
$stream = $emitter->stream(); $stream = $emitter->stream();
$stream->listen(function ($value) { $stream->onEmit(function ($value) {
printf("Stream emitted %d\n", $value); printf("Stream emitted %d\n", $value);
return new Pause(500); // Artificial back-pressure on stream. return new Pause(500); // Artificial back-pressure on stream.
}); });

View File

@ -32,6 +32,6 @@ final class Failure implements Stream {
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function listen(callable $onNext) { public function onEmit(callable $onEmit) {
} }
} }

View File

@ -26,14 +26,14 @@ trait Producer {
private $listeners = []; private $listeners = [];
/** /**
* @param callable $onNext * @param callable $onEmit
*/ */
public function listen(callable $onNext) { public function onEmit(callable $onEmit) {
if ($this->resolved) { if ($this->resolved) {
return; return;
} }
$this->listeners[] = $onNext; $this->listeners[] = $onEmit;
} }
/** /**
@ -79,9 +79,9 @@ trait Producer {
$promises = []; $promises = [];
foreach ($this->listeners as $onNext) { foreach ($this->listeners as $onEmit) {
try { try {
$result = $onNext($value); $result = $onEmit($value);
if ($result instanceof ReactPromise) { if ($result instanceof ReactPromise) {
$result = adapt($result); $result = adapt($result);
} }

View File

@ -48,7 +48,7 @@ class Listener implements Iterator {
$backPressure = &$this->backPressure; $backPressure = &$this->backPressure;
$resolved = &$this->resolved; $resolved = &$this->resolved;
$this->stream->listen(static function ($value) use (&$waiting, &$values, &$backPressure, &$resolved) { $this->stream->onEmit(static function ($value) use (&$waiting, &$values, &$backPressure, &$resolved) {
$values[] = $value; $values[] = $value;
$backPressure[] = $pressure = new Deferred; $backPressure[] = $pressure = new Deferred;

View File

@ -12,10 +12,10 @@ interface Stream extends Promise {
* Registers a callback to be invoked each time value is emitted from the stream. If the function returns an * Registers a callback to be invoked each time value is emitted from the stream. If the function returns an
* promise, back-pressure is applied to the promise until the returned promise is resolved. * promise, back-pressure is applied to the promise until the returned promise is resolved.
* *
* Exceptions thrown from $onNext (or failures of promises returned from $onNext) will fail the returned * Exceptions thrown from $onEmit (or failures of promises returned from $onNext) will fail the returned
* Subscriber with the thrown exception. * Subscriber with the thrown exception.
* *
* @param callable $onNext Function invoked each time a value is emitted from the stream. * @param callable $onEmit Function invoked each time a value is emitted from the stream.
*/ */
public function listen(callable $onNext); public function onEmit(callable $onEmit);
} }

View File

@ -41,6 +41,6 @@ final class Success implements Stream {
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function listen(callable $onNext) { public function onEmit(callable $onEmit) {
} }
} }

View File

@ -609,21 +609,21 @@ namespace Amp\Stream {
/** /**
* @param \Amp\Stream $stream * @param \Amp\Stream $stream
* @param callable (mixed $value): mixed $onNext * @param callable (mixed $value): mixed $onEmit
* @param callable (mixed $value): mixed|null $onComplete * @param callable (mixed $value): mixed|null $onResolve
* *
* @return \Amp\Stream * @return \Amp\Stream
*/ */
function map(Stream $stream, callable $onNext, callable $onComplete = null): Stream { function map(Stream $stream, callable $onEmit, callable $onResolve = null): Stream {
$listener = new Listener($stream); $listener = new Listener($stream);
return new Producer(function (callable $emit) use ($listener, $onNext, $onComplete) { return new Producer(function (callable $emit) use ($listener, $onEmit, $onResolve) {
while (yield $listener->advance()) { while (yield $listener->advance()) {
yield $emit($onNext($listener->getCurrent())); yield $emit($onEmit($listener->getCurrent()));
} }
if ($onComplete === null) { if ($onResolve === null) {
return $listener->getResult(); return $listener->getResult();
} }
return $onComplete($listener->getResult()); return $onResolve($listener->getResult());
}); });
} }
@ -660,7 +660,7 @@ namespace Amp\Stream {
if (!$stream instanceof Stream) { if (!$stream instanceof Stream) {
throw new UnionTypeError([Stream::class], $stream); throw new UnionTypeError([Stream::class], $stream);
} }
$stream->listen(function ($value) use (&$pending, $emitter) { $stream->onEmit(function ($value) use (&$pending, $emitter) {
if ($pending) { if ($pending) {
return $emitter->emit($value); return $emitter->emit($value);
} }
@ -723,7 +723,7 @@ namespace Amp\Stream {
yield $emitter->emit($value); yield $emitter->emit($value);
}; };
$subscriptions[] = $stream->listen(function ($value) use ($generator) { $subscriptions[] = $stream->onEmit(function ($value) use ($generator) {
return new Coroutine($generator($value)); return new Coroutine($generator($value));
}); });
$previous[] = $stream; $previous[] = $stream;

View File

@ -50,7 +50,7 @@ class ConcatTest extends \PHPUnit\Framework\TestCase {
$stream = Stream\concat([Stream\fromIterable(\range(1, 5)), $producer, Stream\fromIterable(\range(7, 10))]); $stream = Stream\concat([Stream\fromIterable(\range(1, 5)), $producer, Stream\fromIterable(\range(7, 10))]);
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });

View File

@ -42,7 +42,7 @@ class FilterTest extends \PHPUnit\Framework\TestCase {
return $value & 1; return $value & 1;
}); });
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });
@ -72,7 +72,7 @@ class FilterTest extends \PHPUnit\Framework\TestCase {
throw $exception; throw $exception;
}); });
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });

View File

@ -33,7 +33,7 @@ class IntervalTest extends \PHPUnit\Framework\TestCase {
Loop::run(function () use (&$invoked, $count) { Loop::run(function () use (&$invoked, $count) {
$stream = Stream\interval(self::TIMEOUT, $count); $stream = Stream\interval(self::TIMEOUT, $count);
$stream->listen(function () use (&$invoked) { $stream->onEmit(function () use (&$invoked) {
++$invoked; ++$invoked;
return new Pause(self::TIMEOUT * 2); return new Pause(self::TIMEOUT * 2);
}); });

View File

@ -37,7 +37,7 @@ class ProducerTest extends TestCase {
$this->assertSame($emitted, $value); $this->assertSame($emitted, $value);
}; };
$producer->listen($callback); $producer->onEmit($callback);
$producer->onResolve(function ($exception, $result) use ($value) { $producer->onResolve(function ($exception, $result) use ($value) {
$this->assertSame($result, $value); $this->assertSame($result, $value);
@ -66,7 +66,7 @@ class ProducerTest extends TestCase {
$this->assertSame($emitted, $value); $this->assertSame($emitted, $value);
}; };
$producer->listen($callback); $producer->onEmit($callback);
$deferred->resolve($value); $deferred->resolve($value);
}); });
@ -108,7 +108,7 @@ class ProducerTest extends TestCase {
$time = microtime(true) - $time; $time = microtime(true) - $time;
}); });
$producer->listen(function () { $producer->onEmit(function () {
return new Pause(self::TIMEOUT); return new Pause(self::TIMEOUT);
}); });
}); });
@ -130,7 +130,7 @@ class ProducerTest extends TestCase {
$time = microtime(true) - $time; $time = microtime(true) - $time;
}); });
$producer->listen(function () { $producer->onEmit(function () {
return new ReactPromise(function ($resolve) { return new ReactPromise(function ($resolve) {
Loop::delay(self::TIMEOUT, $resolve); Loop::delay(self::TIMEOUT, $resolve);
}); });
@ -153,7 +153,7 @@ class ProducerTest extends TestCase {
yield $emit(2); yield $emit(2);
}); });
$producer->listen(function () use ($exception) { $producer->onEmit(function () use ($exception) {
throw $exception; throw $exception;
}); });
}); });
@ -192,7 +192,7 @@ class ProducerTest extends TestCase {
yield $producer; yield $producer;
$producer->listen(function () use (&$invoked) { $producer->onEmit(function () use (&$invoked) {
$invoked = true; $invoked = true;
}); });
}); });

View File

@ -35,7 +35,7 @@ class ProducerTraitTest extends TestCase {
$this->assertSame($emitted, $value); $this->assertSame($emitted, $value);
}; };
$this->producer->listen($callback); $this->producer->onEmit($callback);
$promise = $this->producer->emit($value); $promise = $this->producer->emit($value);
$this->assertInstanceOf(Promise::class, $promise); $this->assertInstanceOf(Promise::class, $promise);
@ -55,7 +55,7 @@ class ProducerTraitTest extends TestCase {
$this->assertSame($emitted, $value); $this->assertSame($emitted, $value);
}; };
$this->producer->listen($callback); $this->producer->onEmit($callback);
$this->producer->emit($promise); $this->producer->emit($promise);
$this->assertTrue($invoked); $this->assertTrue($invoked);
@ -73,7 +73,7 @@ class ProducerTraitTest extends TestCase {
$invoked = true; $invoked = true;
}; };
$this->producer->listen($callback); $this->producer->onEmit($callback);
$this->producer->emit($promise); $this->producer->emit($promise);
$this->assertFalse($invoked); $this->assertFalse($invoked);
@ -100,7 +100,7 @@ class ProducerTraitTest extends TestCase {
$this->assertSame($emitted, $value); $this->assertSame($emitted, $value);
}; };
$this->producer->listen($callback); $this->producer->onEmit($callback);
$this->producer->emit($deferred->promise()); $this->producer->emit($deferred->promise());
$this->assertFalse($invoked); $this->assertFalse($invoked);
@ -123,7 +123,7 @@ class ProducerTraitTest extends TestCase {
$this->assertSame($emitted, $value); $this->assertSame($emitted, $value);
}; };
$this->producer->listen($callback); $this->producer->onEmit($callback);
$this->producer->emit($promise); $this->producer->emit($promise);
$this->assertTrue($invoked); $this->assertTrue($invoked);
@ -141,7 +141,7 @@ class ProducerTraitTest extends TestCase {
$result = $emitted; $result = $emitted;
}; };
$this->producer->listen($callback); $this->producer->onEmit($callback);
$this->producer->emit($deferred->promise()); $this->producer->emit($deferred->promise());
$this->assertFalse($invoked); $this->assertFalse($invoked);
@ -215,7 +215,7 @@ class ProducerTraitTest extends TestCase {
try { try {
Loop::run(function () use ($exception) { Loop::run(function () use ($exception) {
$this->producer->listen(function () use ($exception) { $this->producer->onEmit(function () use ($exception) {
throw $exception; throw $exception;
}); });
@ -231,7 +231,7 @@ class ProducerTraitTest extends TestCase {
$value = 1; $value = 1;
$promise = new Success($value); $promise = new Success($value);
$this->producer->listen(function () use ($promise) { $this->producer->onEmit(function () use ($promise) {
return $promise; return $promise;
}); });
@ -249,7 +249,7 @@ class ProducerTraitTest extends TestCase {
try { try {
Loop::run(function () use ($exception, $promise) { Loop::run(function () use ($exception, $promise) {
$this->producer->listen(function () use ($promise) { $this->producer->onEmit(function () use ($promise) {
return $promise; return $promise;
}); });

View File

@ -14,7 +14,7 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase {
Loop::run(function () use (&$results) { Loop::run(function () use (&$results) {
$stream = Stream\fromIterable([new Success(1), new Success(2), new Success(3)]); $stream = Stream\fromIterable([new Success(1), new Success(2), new Success(3)]);
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });
}); });
@ -43,7 +43,7 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase {
Loop::run(function () use (&$results, &$reason, $exception) { Loop::run(function () use (&$results, &$reason, $exception) {
$stream = Stream\fromIterable([new Success(1), new Success(2), new Failure($exception), new Success(4)]); $stream = Stream\fromIterable([new Success(1), new Success(2), new Failure($exception), new Success(4)]);
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });
@ -63,7 +63,7 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase {
Loop::run(function () use (&$results) { Loop::run(function () use (&$results) {
$stream = Stream\fromIterable([new Pause(30, 1), new Pause(10, 2), new Pause(20, 3), new Success(4)]); $stream = Stream\fromIterable([new Pause(30, 1), new Pause(10, 2), new Pause(20, 3), new Success(4)]);
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });
}); });
@ -82,7 +82,7 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase {
$stream = Stream\fromIterable($generator); $stream = Stream\fromIterable($generator);
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });
}); });

View File

@ -46,7 +46,7 @@ class StreamMapTest extends \PHPUnit\Framework\TestCase {
return $value + 1; return $value + 1;
}); });
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });
@ -77,7 +77,7 @@ class StreamMapTest extends \PHPUnit\Framework\TestCase {
throw $exception; throw $exception;
}); });
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });
@ -113,7 +113,7 @@ class StreamMapTest extends \PHPUnit\Framework\TestCase {
throw $exception; throw $exception;
}); });
$stream->listen(function ($value) use (&$results) { $stream->onEmit(function ($value) use (&$results) {
$results[] = $value; $results[] = $value;
}); });