1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Add Emitter and Observer tests

This commit is contained in:
Aaron Piotrowski 2016-12-13 15:25:52 -06:00
parent 1bbcadaedd
commit c73ca4e427
2 changed files with 317 additions and 0 deletions

135
test/EmitterTest.php Normal file
View File

@ -0,0 +1,135 @@
<?php declare(strict_types = 1);
namespace Amp\Test;
use Amp;
use Amp\{ Deferred, Emitter, Pause };
class EmitterTest extends \PHPUnit_Framework_TestCase {
const TIMEOUT = 100;
/**
* @expectedException \Error
* @expectedExceptionMessage The callable did not return a Generator
*/
public function testNonGeneratorCallable() {
$emitter = new Emitter(function () {});
}
public function testEmit() {
$invoked = false;
Amp\execute(function () use (&$invoked) {
$value = 1;
$emitter = new Emitter(function (callable $emit) use ($value) {
yield $emit($value);
return $value;
});
$invoked = false;
$callback = function ($emitted) use (&$invoked, $value) {
$invoked = true;
$this->assertSame($emitted, $value);
};
$emitter->subscribe($callback);
$emitter->when(function ($exception, $result) use ($value) {
$this->assertSame($result, $value);
});
});
$this->assertTrue($invoked);
}
/**
* @depends testEmit
*/
public function testEmitSuccessfulPromise() {
$invoked = false;
Amp\execute(function () use (&$invoked) {
$deferred = new Deferred();
$emitter = new Emitter(function (callable $emit) use ($deferred) {
return yield $emit($deferred->promise());
});
$value = 1;
$invoked = false;
$callback = function ($emitted) use (&$invoked, $value) {
$invoked = true;
$this->assertSame($emitted, $value);
};
$emitter->subscribe($callback);
$deferred->resolve($value);
});
$this->assertTrue($invoked);
}
/**
* @depends testEmitSuccessfulPromise
*/
public function testEmitFailedPromise() {
$exception = new \Exception;
Amp\execute(function () use ($exception) {
$deferred = new Deferred();
$emitter = new Emitter(function (callable $emit) use ($deferred) {
return yield $emit($deferred->promise());
});
$deferred->fail($exception);
$emitter->when(function ($reason) use ($exception) {
$this->assertSame($reason, $exception);
});
});
}
/**
* @depends testEmit
*/
public function testEmitBackPressure() {
$emits = 3;
Amp\execute(function () use (&$time, $emits) {
$emitter = new Emitter(function (callable $emit) use (&$time, $emits) {
$time = microtime(true);
for ($i = 0; $i < $emits; ++$i) {
yield $emit($i);
}
$time = microtime(true) - $time;
});
$emitter->subscribe(function () {
return new Pause(self::TIMEOUT);
});
});
$this->assertGreaterThan(self::TIMEOUT * $emits, $time * 1000);
}
/**
* @depends testEmit
*/
public function testSubscriberThrows() {
$exception = new \Exception;
try {
Amp\execute(function () use ($exception) {
$emitter = new Emitter(function (callable $emit) {
yield $emit(1);
yield $emit(2);
});
$emitter->subscribe(function () use ($exception) {
throw $exception;
});
});
} catch (\Exception $caught) {
$this->assertSame($exception, $caught);
}
}
}

182
test/ObserverTest.php Normal file
View File

@ -0,0 +1,182 @@
<?php declare(strict_types = 1);
namespace Amp\Test;
use Amp;
use Amp\{ Emitter, Observer, Pause, Postponed };
class ObserverTest extends \PHPUnit_Framework_TestCase {
const TIMEOUT = 10;
public function testSingleEmittingObservable() {
Amp\execute(function () {
$value = 1;
$observable = new Emitter(function (callable $emit) use ($value) {
yield $emit($value);
return $value;
});
$observer = new Observer($observable);
while (yield $observer->advance()) {
$this->assertSame($observer->getCurrent(), $value);
}
$this->assertSame($observer->getResult(), $value);
});
}
/**
* @depends testSingleEmittingObservable
*/
public function testFastEmittingObservable() {
Amp\execute(function () {
$count = 10;
$postponed = new Postponed;
$observer = new Observer($postponed->observe());
for ($i = 0; $i < $count; ++$i) {
$promises[] = $postponed->emit($i);
}
$postponed->resolve($i);
for ($i = 0; yield $observer->advance(); ++$i) {
$this->assertSame($observer->getCurrent(), $i);
}
$this->assertSame($count, $i);
$this->assertSame($observer->getResult(), $i);
});
}
/**
* @depends testSingleEmittingObservable
*/
public function testSlowEmittingObservable() {
Amp\execute(function () {
$count = 10;
$observable = new Emitter(function (callable $emit) use ($count) {
for ($i = 0; $i < $count; ++$i) {
yield new Pause(self::TIMEOUT);
yield $emit($i);
}
return $i;
});
$observer = new Observer($observable);
for ($i = 0; yield $observer->advance(); ++$i) {
$this->assertSame($observer->getCurrent(), $i);
}
$this->assertSame($count, $i);
$this->assertSame($observer->getResult(), $i);
});
}
/**
* @depends testFastEmittingObservable
*/
public function testDrain() {
Amp\execute(function () {
$count = 10;
$postponed = new Postponed;
$observer = new Observer($postponed->observe());
for ($i = 0; $i < $count; ++$i) {
$promises[] = $postponed->emit($i);
}
$postponed->resolve($i);
$values = $observer->drain();
$this->assertSame(\range(0, $count - 1), $values);
});
}
/**
* @expectedException \Error
* @expectedExceptionMessage The observable has not resolved
*/
public function testDrainBeforeResolution() {
$postponed = new Postponed;
$observer = new Observer($postponed->observe());
$observer->drain();
}
public function testFailingObservable() {
Amp\execute(function () {
$exception = new \Exception;
$postponed = new Postponed;
$observer = new Observer($postponed->observe());
$postponed->fail($exception);
try {
while (yield $observer->advance());
$this->fail("Observer::advance() should throw observable failure reason");
} catch (\Exception $reason) {
$this->assertSame($exception, $reason);
}
try {
$result = $observer->getResult();
$this->fail("Observer::getResult() should throw observable failure reason");
} catch (\Exception $reason) {
$this->assertSame($exception, $reason);
}
});
}
/**
* @expectedException \Error
* @expectedExceptionMessage Promise returned from advance() must resolve before calling this method
*/
public function testGetCurrentBeforeAdvanceResolves() {
$postponed = new Postponed;
$observer = new Observer($postponed->observe());
$promise = $observer->advance();
$observer->getCurrent();
}
/**
* @expectedException \Error
* @expectedExceptionMessage The observable has resolved
*/
public function testGetCurrentAfterResolution() {
$postponed = new Postponed;
$observer = new Observer($postponed->observe());
$postponed->resolve();
$observer->getCurrent();
}
/**
* @expectedException \Error
* @expectedExceptionMessage The observable has not resolved
*/
public function testGetResultBeforeResolution() {
Amp\execute(function () {
$postponed = new Postponed;
$observer = new Observer($postponed->observe());
$observer->getResult();
});
}
}