1
0
mirror of https://github.com/danog/amp.git synced 2024-11-27 12:35:02 +01:00
amp/test/ListenerTest.php

253 lines
6.6 KiB
PHP
Raw Normal View History

<?php
2016-12-13 22:25:52 +01:00
namespace Amp\Test;
use Amp;
2017-01-04 02:10:27 +01:00
use Amp\{ Producer, Listener, Pause, Emitter };
use AsyncInterop\Loop;
2016-12-13 22:25:52 +01:00
2017-01-04 02:10:27 +01:00
class ListenerTest extends \PHPUnit_Framework_TestCase {
2016-12-13 22:25:52 +01:00
const TIMEOUT = 10;
2017-01-04 02:10:27 +01:00
public function testSingleEmittingStream() {
2016-12-29 23:57:08 +01:00
Loop::execute(Amp\wrap(function () {
2016-12-13 22:25:52 +01:00
$value = 1;
2017-01-04 02:10:27 +01:00
$stream = new Producer(function (callable $emit) use ($value) {
2016-12-13 22:25:52 +01:00
yield $emit($value);
return $value;
});
2017-01-04 02:10:27 +01:00
$listener = new Listener($stream);
2017-01-04 02:10:27 +01:00
while (yield $listener->advance()) {
$this->assertSame($listener->getCurrent(), $value);
2016-12-13 22:25:52 +01:00
}
2017-01-04 02:10:27 +01:00
$this->assertSame($listener->getResult(), $value);
2016-12-29 23:57:08 +01:00
}));
2016-12-13 22:25:52 +01:00
}
2016-12-13 22:25:52 +01:00
/**
2017-01-04 02:10:27 +01:00
* @depends testSingleEmittingStream
2016-12-13 22:25:52 +01:00
*/
2017-01-04 02:10:27 +01:00
public function testFastEmittingStream() {
2016-12-29 23:57:08 +01:00
Loop::execute(Amp\wrap(function () {
2016-12-13 22:25:52 +01:00
$count = 10;
2017-01-04 02:10:27 +01:00
$emitter = new Emitter;
2017-01-04 02:10:27 +01:00
$listener = new Listener($emitter->stream());
2016-12-13 22:25:52 +01:00
for ($i = 0; $i < $count; ++$i) {
2017-01-04 02:10:27 +01:00
$promises[] = $emitter->emit($i);
2016-12-13 22:25:52 +01:00
}
2017-01-04 02:10:27 +01:00
$emitter->resolve($i);
2017-01-04 02:10:27 +01:00
for ($i = 0; yield $listener->advance(); ++$i) {
$this->assertSame($listener->getCurrent(), $i);
2016-12-13 22:25:52 +01:00
}
2016-12-13 22:25:52 +01:00
$this->assertSame($count, $i);
2017-01-04 02:10:27 +01:00
$this->assertSame($listener->getResult(), $i);
2016-12-29 23:57:08 +01:00
}));
2016-12-13 22:25:52 +01:00
}
2016-12-13 22:25:52 +01:00
/**
2017-01-04 02:10:27 +01:00
* @depends testSingleEmittingStream
2016-12-13 22:25:52 +01:00
*/
2017-01-04 02:10:27 +01:00
public function testSlowEmittingStream() {
2016-12-29 23:57:08 +01:00
Loop::execute(Amp\wrap(function () {
2016-12-13 22:25:52 +01:00
$count = 10;
2017-01-04 02:10:27 +01:00
$stream = new Producer(function (callable $emit) use ($count) {
2016-12-13 22:25:52 +01:00
for ($i = 0; $i < $count; ++$i) {
yield new Pause(self::TIMEOUT);
yield $emit($i);
}
return $i;
});
2017-01-04 02:10:27 +01:00
$listener = new Listener($stream);
2017-01-04 02:10:27 +01:00
for ($i = 0; yield $listener->advance(); ++$i) {
$this->assertSame($listener->getCurrent(), $i);
2016-12-13 22:25:52 +01:00
}
2016-12-13 22:25:52 +01:00
$this->assertSame($count, $i);
2017-01-04 02:10:27 +01:00
$this->assertSame($listener->getResult(), $i);
2016-12-29 23:57:08 +01:00
}));
2016-12-13 22:25:52 +01:00
}
2016-12-13 22:25:52 +01:00
/**
2017-01-04 02:10:27 +01:00
* @depends testFastEmittingStream
2016-12-13 22:25:52 +01:00
*/
public function testDrain() {
2016-12-29 23:57:08 +01:00
Loop::execute(Amp\wrap(function () {
2016-12-13 22:25:52 +01:00
$count = 10;
$expected = \range(0, $count - 1);
2017-01-04 02:10:27 +01:00
$emitter = new Emitter;
2017-01-04 02:10:27 +01:00
$listener = new Listener($emitter->stream());
2016-12-13 22:25:52 +01:00
for ($i = 0; $i < $count; ++$i) {
2017-01-04 02:10:27 +01:00
$promises[] = $emitter->emit($i);
2016-12-13 22:25:52 +01:00
}
$value = null;
if (yield $listener->advance()) {
$value = $listener->getCurrent();
}
$this->assertSame(reset($expected), $value);
unset($expected[0]);
2017-01-04 02:10:27 +01:00
$emitter->resolve($i);
2017-01-04 02:10:27 +01:00
$values = $listener->drain();
$this->assertSame($expected, $values);
2016-12-29 23:57:08 +01:00
}));
2016-12-13 22:25:52 +01:00
}
2016-12-13 22:25:52 +01:00
/**
* @expectedException \Error
2017-01-04 02:10:27 +01:00
* @expectedExceptionMessage The stream has not resolved
2016-12-13 22:25:52 +01:00
*/
public function testDrainBeforeResolution() {
2017-01-04 02:10:27 +01:00
$emitter = new Emitter;
2017-01-04 02:10:27 +01:00
$listener = new Listener($emitter->stream());
2017-01-04 02:10:27 +01:00
$listener->drain();
2016-12-13 22:25:52 +01:00
}
2017-01-04 02:10:27 +01:00
public function testFailingStream() {
2016-12-29 23:57:08 +01:00
Loop::execute(Amp\wrap(function () {
2016-12-13 22:25:52 +01:00
$exception = new \Exception;
2017-01-04 02:10:27 +01:00
$emitter = new Emitter;
2017-01-04 02:10:27 +01:00
$listener = new Listener($emitter->stream());
2017-01-04 02:10:27 +01:00
$emitter->fail($exception);
2016-12-13 22:25:52 +01:00
try {
2017-01-04 02:10:27 +01:00
while (yield $listener->advance());
$this->fail("Listener::advance() should throw stream failure reason");
2016-12-13 22:25:52 +01:00
} catch (\Exception $reason) {
$this->assertSame($exception, $reason);
}
2016-12-13 22:25:52 +01:00
try {
2017-01-04 02:10:27 +01:00
$result = $listener->getResult();
$this->fail("Listener::getResult() should throw stream failure reason");
2016-12-13 22:25:52 +01:00
} catch (\Exception $reason) {
$this->assertSame($exception, $reason);
}
2016-12-29 23:57:08 +01:00
}));
2016-12-13 22:25:52 +01:00
}
2016-12-13 22:25:52 +01:00
/**
* @expectedException \Error
* @expectedExceptionMessage Promise returned from advance() must resolve before calling this method
*/
public function testGetCurrentBeforeAdvanceResolves() {
2017-01-04 02:10:27 +01:00
$emitter = new Emitter;
2017-01-04 02:10:27 +01:00
$listener = new Listener($emitter->stream());
2017-01-04 02:10:27 +01:00
$promise = $listener->advance();
2017-01-04 02:10:27 +01:00
$listener->getCurrent();
2016-12-13 22:25:52 +01:00
}
2016-12-13 22:25:52 +01:00
/**
* @expectedException \Error
2017-01-04 02:10:27 +01:00
* @expectedExceptionMessage The stream has resolved
2016-12-13 22:25:52 +01:00
*/
public function testGetCurrentAfterResolution() {
2017-01-04 02:10:27 +01:00
$emitter = new Emitter;
2017-01-04 02:10:27 +01:00
$listener = new Listener($emitter->stream());
2017-01-04 02:10:27 +01:00
$emitter->resolve();
2017-01-04 02:10:27 +01:00
$listener->getCurrent();
2016-12-13 22:25:52 +01:00
}
2016-12-13 22:25:52 +01:00
/**
* @expectedException \Error
2017-01-04 02:10:27 +01:00
* @expectedExceptionMessage The stream has not resolved
2016-12-13 22:25:52 +01:00
*/
public function testGetResultBeforeResolution() {
2016-12-29 23:57:08 +01:00
Loop::execute(Amp\wrap(function () {
2017-01-04 02:10:27 +01:00
$emitter = new Emitter;
2017-01-04 02:10:27 +01:00
$listener = new Listener($emitter->stream());
2017-01-04 02:10:27 +01:00
$listener->getResult();
2016-12-29 23:57:08 +01:00
}));
2016-12-13 22:25:52 +01:00
}
/**
* @expectedException \Error
* @expectedExceptionMessage The prior promise returned must resolve before invoking this method again
*/
public function testConsecutiveAdvanceCalls() {
$emitter = new Emitter;
$listener = new Listener($emitter->stream());
$listener->advance();
$listener->advance();
}
public function testListenerDestroyedAfterEmits() {
$emitter = new Emitter;
$listener = new Listener($emitter->stream());
$promise = $emitter->emit(1);
unset($listener);
$invoked = false;
$promise->when(function () use (&$invoked) {
$invoked = true;
});
$this->assertTrue($invoked);
}
public function testListenerDestroyedThenStreamEmits() {
$emitter = new Emitter;
$listener = new Listener($emitter->stream());
$emitter->emit(1);
unset($listener);
$promise = $emitter->emit(2);
$invoked = false;
$promise->when(function () use (&$invoked) {
$invoked = true;
});
$this->assertTrue($invoked);
}
public function testStreamFailsWhenListenerWaiting() {
$exception = new \Exception;
$emitter = new Emitter;
$listener = new Listener($emitter->stream());
$promise = $listener->advance();
$promise->when(function ($exception, $value) use (&$reason) {
$reason = $exception;
});
$emitter->fail($exception);
$this->assertSame($exception, $reason);
}
2016-12-13 22:25:52 +01:00
}