1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Update stream tests

This commit is contained in:
Aaron Piotrowski 2017-04-27 10:32:53 -05:00 committed by Niklas Keller
parent a096a36f9a
commit 4992d3ebac
9 changed files with 253 additions and 722 deletions

View File

@ -561,8 +561,8 @@ namespace Amp\Stream {
$result = $emitter->stream();
$coroutine = coroutine(function (Stream $stream) use (&$emitter) {
while (yield $stream->advance() && $emitter !== null) {
$emitter->emit($stream->getCurrent());
while ((yield $stream->advance()) && $emitter !== null) {
yield $emitter->emit($stream->getCurrent());
}
});
@ -603,7 +603,6 @@ namespace Amp\Stream {
}
$emitter = new Emitter;
$subscriptions = [];
$previous = [];
$promise = Promise\all($previous);
@ -633,8 +632,7 @@ namespace Amp\Stream {
yield $emitter->emit($value);
});
$subscriptions[] = $coroutine($stream, $emit);
$previous[] = $stream;
$previous[] = $coroutine($stream, $emit);
$promise = Promise\all($previous);
}

View File

@ -22,27 +22,26 @@ class ConcatTest extends \PHPUnit\Framework\TestCase {
* @param array $expected
*/
public function testConcat(array $streams, array $expected) {
$streams = \array_map(function (array $stream): Stream {
return Stream\fromIterable($stream);
}, $streams);
Loop::run(function () use ($streams, $expected) {
$streams = \array_map(function (array $stream): Stream {
return Stream\fromIterable($stream);
}, $streams);
$stream = Stream\concat($streams);
$stream = Stream\concat($streams);
Stream\map($stream, function ($value) use ($expected) {
static $i = 0;
$this->assertSame($expected[$i++], $value);
while (yield $stream->advance()) {
$this->assertSame(\array_shift($expected), $stream->getCurrent());
}
});
Loop::run();
}
/**
* @depends testConcat
*/
public function testConcatWithFailedStream() {
$exception = new \Exception;
$results = [];
Loop::run(function () use (&$results, &$reason, $exception) {
Loop::run(function () {
$exception = new \Exception;
$expected = \range(1, 6);
$producer = new Producer(function (callable $emit) use ($exception) {
yield $emit(6); // Emit once before failing.
throw $exception;
@ -50,19 +49,16 @@ class ConcatTest extends \PHPUnit\Framework\TestCase {
$stream = Stream\concat([Stream\fromIterable(\range(1, 5)), $producer, Stream\fromIterable(\range(7, 10))]);
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
try {
while (yield $stream->advance()) {
$this->assertSame(\array_shift($expected), $stream->getCurrent());
}
} catch (\Throwable $reason) {
$this->assertSame($exception, $reason);
}
$callback = function ($exception, $value) use (&$reason) {
$reason = $exception;
};
$stream->onResolve($callback);
$this->assertEmpty($expected);
});
$this->assertSame(\range(1, 6), $results);
$this->assertSame($exception, $reason);
}
/**

View File

@ -19,18 +19,17 @@ class FilterTest extends \PHPUnit\Framework\TestCase {
$this->assertInstanceOf(Stream::class, $stream);
$emitter->resolve();
$emitter->complete();
});
$this->assertFalse($invoked);
}
public function testValuesEmitted() {
$count = 0;
$values = [1, 2, 3];
$results = [];
$expected = [1, 3];
Loop::run(function () use (&$results, &$result, &$count, $values) {
Loop::run(function () {
$count = 0;
$values = [1, 2, 3];
$expected = [1, 3];
$producer = new Producer(function (callable $emit) use ($values) {
foreach ($values as $value) {
yield $emit($value);
@ -42,26 +41,20 @@ class FilterTest extends \PHPUnit\Framework\TestCase {
return $value & 1;
});
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
$stream->onResolve(function ($exception, $value) use (&$result) {
$result = $value;
});
while (yield $stream->advance()) {
$this->assertSame(\array_shift($expected), $stream->getCurrent());
}
$this->assertSame(3, $count);
});
$this->assertSame(\count($values), $count);
$this->assertSame($expected, $results);
}
/**
* @depends testValuesEmitted
*/
public function testCallbackThrows() {
$values = [1, 2, 3];
$exception = new \Exception;
Loop::run(function () use (&$reason, $values, $exception) {
Loop::run(function () {
$values = [1, 2, 3];
$exception = new \Exception;
$producer = new Producer(function (callable $emit) use ($values) {
foreach ($values as $value) {
yield $emit($value);
@ -72,24 +65,21 @@ class FilterTest extends \PHPUnit\Framework\TestCase {
throw $exception;
});
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
$callback = function ($exception, $value) use (&$reason) {
$reason = $exception;
};
$stream->onResolve($callback);
try {
while (yield $stream->advance()) {
$stream->getCurrent();
}
} catch (\Exception $reason) {
$this->assertSame($reason, $exception);
}
});
$this->assertSame($exception, $reason);
}
public function testStreamFails() {
$invoked = false;
$exception = new \Exception;
Loop::run(function () use (&$invoked, &$reason, &$exception) {
Loop::run(function () {
$invoked = false;
$exception = new \Exception;
$emitter = new Emitter;
$stream = Stream\filter($emitter->stream(), function ($value) use (&$invoked) {
@ -98,14 +88,16 @@ class FilterTest extends \PHPUnit\Framework\TestCase {
$emitter->fail($exception);
$callback = function ($exception, $value) use (&$reason) {
$reason = $exception;
};
try {
while (yield $stream->advance()) {
$stream->getCurrent();
}
} catch (\Exception $reason) {
$this->assertSame($reason, $exception);
}
$stream->onResolve($callback);
$this->assertFalse($invoked);
});
$this->assertFalse($invoked);
$this->assertSame($exception, $reason);
}
}

View File

@ -19,53 +19,43 @@ class MapTest extends \PHPUnit\Framework\TestCase {
$this->assertInstanceOf(Stream::class, $stream);
$emitter->resolve();
$emitter->complete();
});
$this->assertFalse($invoked);
}
public function testValuesEmitted() {
$count = 0;
$values = [1, 2, 3];
$final = 4;
$results = [];
Loop::run(function () use (&$results, &$result, &$count, $values, $final) {
$producer = new Producer(function (callable $emit) use ($values, $final) {
Loop::run(function () {
$count = 0;
$values = [1, 2, 3];
$producer = new Producer(function (callable $emit) use ($values) {
foreach ($values as $value) {
yield $emit($value);
}
return $final;
});
$stream = Stream\map($producer, function ($value) use (&$count) {
++$count;
return $value + 1;
}, function ($value) use (&$invoked) {
return $value + 1;
});
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
while (yield $stream->advance()) {
$this->assertSame(\array_shift($values) + 1, $stream->getCurrent());
}
$stream->onResolve(function ($exception, $value) use (&$result) {
$result = $value;
});
$this->assertSame(3, $count);
});
$this->assertSame(\count($values), $count);
$this->assertSame(\array_map(function ($value) { return $value + 1; }, $values), $results);
$this->assertSame($final + 1, $result);
}
/**
* @depends testValuesEmitted
*/
public function testOnNextCallbackThrows() {
$values = [1, 2, 3];
$exception = new \Exception;
Loop::run(function () use (&$reason, $values, $exception) {
Loop::run(function () {
$values = [1, 2, 3];
$exception = new \Exception;
$producer = new Producer(function (callable $emit) use ($values) {
foreach ($values as $value) {
yield $emit($value);
@ -76,62 +66,20 @@ class MapTest extends \PHPUnit\Framework\TestCase {
throw $exception;
});
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
$callback = function ($exception, $value) use (&$reason) {
$reason = $exception;
};
$stream->onResolve($callback);
});
$this->assertSame($exception, $reason);
}
/**
* @depends testValuesEmitted
*/
public function testOnCompleteCallbackThrows() {
$count = 0;
$values = [1, 2, 3];
$results = [];
$exception = new \Exception;
Loop::run(function () use (&$reason, &$results, &$count, $values, $exception) {
$producer = new Producer(function (callable $emit) use ($values) {
foreach ($values as $value) {
yield $emit($value);
try {
while (yield $stream->advance()) {
$stream->getCurrent();
}
});
$stream = Stream\map($producer, function ($value) use (&$count) {
++$count;
return $value + 1;
}, function ($value) use ($exception) {
throw $exception;
});
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
$callback = function ($exception, $value) use (&$reason) {
$reason = $exception;
};
$stream->onResolve($callback);
} catch (\Exception $reason) {
$this->assertSame($reason, $exception);
}
});
$this->assertSame(\count($values), $count);
$this->assertSame(\array_map(function ($value) { return $value + 1; }, $values), $results);
$this->assertSame($exception, $reason);
}
public function testStreamFails() {
$invoked = false;
$exception = new \Exception;
Loop::run(function () use (&$invoked, &$reason, &$exception) {
Loop::run(function () {
$invoked = false;
$exception = new \Exception;
$emitter = new Emitter;
$stream = Stream\map($emitter->stream(), function ($value) use (&$invoked) {
@ -140,14 +88,15 @@ class MapTest extends \PHPUnit\Framework\TestCase {
$emitter->fail($exception);
$callback = function ($exception, $value) use (&$reason) {
$reason = $exception;
};
try {
while (yield $stream->advance()) {
$stream->getCurrent();
}
} catch (\Exception $reason) {
$this->assertSame($reason, $exception);
}
$stream->onResolve($callback);
$this->assertFalse($invoked);
});
$this->assertFalse($invoked);
$this->assertSame($exception, $reason);
}
}

View File

@ -3,6 +3,7 @@
namespace Amp\Test;
use Amp\Loop;
use Amp\Pause;
use Amp\Producer;
use Amp\Stream;
@ -22,26 +23,55 @@ class MergeTest extends \PHPUnit\Framework\TestCase {
* @param array $expected
*/
public function testMerge(array $streams, array $expected) {
$streams = \array_map(function (array $stream): Stream {
return Stream\fromIterable($stream);
}, $streams);
Loop::run(function () use ($streams, $expected) {
$streams = \array_map(function (array $stream): Stream {
return Stream\fromIterable($stream);
}, $streams);
$stream = Stream\merge($streams);
$stream = Stream\merge($streams);
Stream\map($stream, function ($value) use ($expected) {
static $i = 0;
$this->assertSame($expected[$i++], $value);
while (yield $stream->advance()) {
$this->assertSame(\array_shift($expected), $stream->getCurrent());
}
});
}
Loop::run();
/**
* @depends testMerge
*/
public function testMergeWithDelayedEmits() {
Loop::run(function () {
$streams = [];
$values1 = [new Pause(10, 1), new Pause(50, 2), new Pause(70, 3)];
$values2 = [new Pause(20, 4), new Pause(40, 5), new Pause(60, 6)];
$expected = [1, 4, 5, 2, 6, 3];
$streams[] = new Producer(function (callable $emit) use ($values1) {
foreach ($values1 as $value) {
yield $emit($value);
}
});
$streams[] = new Producer(function (callable $emit) use ($values2) {
foreach ($values2 as $value) {
yield $emit($value);
}
});
$stream = Stream\merge($streams);
while (yield $stream->advance()) {
$this->assertSame(\array_shift($expected), $stream->getCurrent());
}
});
}
/**
* @depends testMerge
*/
public function testMergeWithFailedStream() {
$exception = new \Exception;
Loop::run(function () use (&$reason, $exception) {
Loop::run(function () {
$exception = new \Exception;
$producer = new Producer(function (callable $emit) use ($exception) {
yield $emit(1); // Emit once before failing.
throw $exception;
@ -49,14 +79,12 @@ class MergeTest extends \PHPUnit\Framework\TestCase {
$stream = Stream\merge([$producer, Stream\fromIterable(\range(1, 5))]);
$callback = function ($exception, $value) use (&$reason) {
$reason = $exception;
};
$stream->onResolve($callback);
try {
while (yield $stream->advance());
} catch (\Throwable $reason) {
$this->assertSame($exception, $reason);
}
});
$this->assertSame($exception, $reason);
}
/**

View File

@ -22,56 +22,35 @@ class ProducerTest extends TestCase {
}
public function testEmit() {
$invoked = false;
Loop::run(function () use (&$invoked) {
Loop::run(function () {
$value = 1;
$producer = new Producer(function (callable $emit) use ($value) {
yield $emit($value);
return $value;
});
$invoked = false;
$callback = function ($emitted) use (&$invoked, $value) {
$invoked = true;
$this->assertSame($emitted, $value);
};
$producer->onEmit($callback);
$producer->onResolve(function ($exception, $result) use ($value) {
$this->assertSame($result, $value);
});
$this->assertTrue(yield $producer->advance());
$this->assertSame($producer->getCurrent(), $value);
});
$this->assertTrue($invoked);
}
/**
* @depends testEmit
*/
public function testEmitSuccessfulPromise() {
$invoked = false;
Loop::run(function () use (&$invoked) {
Loop::run(function () {
$deferred = new Deferred();
$producer = new Producer(function (callable $emit) use ($deferred) {
return yield $emit($deferred->promise());
yield $emit($deferred->promise());
});
$value = 1;
$invoked = false;
$callback = function ($emitted) use (&$invoked, $value) {
$invoked = true;
$this->assertSame($emitted, $value);
};
$producer->onEmit($callback);
$deferred->resolve($value);
});
$this->assertTrue($invoked);
$this->assertTrue(yield $producer->advance());
$this->assertSame($producer->getCurrent(), $value);
});
}
/**
@ -88,9 +67,11 @@ class ProducerTest extends TestCase {
$deferred->fail($exception);
$producer->onResolve(function ($reason) use ($exception) {
try {
yield $producer->advance();
} catch (\Exception $reason) {
$this->assertSame($reason, $exception);
});
}
});
}
@ -108,60 +89,14 @@ class ProducerTest extends TestCase {
$time = microtime(true) - $time;
});
$producer->onEmit(function () {
return new Delayed(self::TIMEOUT);
});
while (yield $producer->advance()) {
yield new Delayed(self::TIMEOUT);
}
});
$this->assertGreaterThan(self::TIMEOUT * $emits - 1 /* 1ms grace period */, $time * 1000);
}
/**
* @depends testEmit
*/
public function testEmitReactBackPressure() {
$emits = 3;
Loop::run(function () use (&$time, $emits) {
$producer = new Producer(function (callable $emit) use (&$time, $emits) {
$time = microtime(true);
for ($i = 0; $i < $emits; ++$i) {
yield $emit($i);
}
$time = microtime(true) - $time;
});
$producer->onEmit(function () {
return new ReactPromise(function ($resolve) {
Loop::delay(self::TIMEOUT, $resolve);
});
});
});
$this->assertGreaterThan(self::TIMEOUT * $emits - 1 /* 1ms grace period */, $time * 1000);
}
/**
* @depends testEmit
*/
public function testSubscriberThrows() {
$exception = new \Exception;
try {
Loop::run(function () use ($exception) {
$producer = new Producer(function (callable $emit) {
yield $emit(1);
yield $emit(2);
});
$producer->onEmit(function () use ($exception) {
throw $exception;
});
});
} catch (\Exception $caught) {
$this->assertSame($exception, $caught);
}
}
/**
* @depends testEmit
*/
@ -175,28 +110,10 @@ class ProducerTest extends TestCase {
throw $exception;
});
Amp\Promise\wait($producer);
while (yield $producer->advance());
});
} catch (\Exception $caught) {
$this->assertSame($exception, $caught);
}
}
public function testListenAfterResolve() {
$invoked = false;
Loop::run(function () use (&$invoked) {
$producer = new Producer(function (callable $emit) use (&$invoked) {
yield $emit(1);
});
yield $producer;
$producer->onEmit(function () use (&$invoked) {
$invoked = true;
});
});
$this->assertFalse($invoked);
}
}

View File

@ -13,7 +13,7 @@ use React\Promise\FulfilledPromise as FulfilledReactPromise;
class Producer {
use \Amp\Internal\Producer {
emit as public;
resolve as public;
complete as public;
fail as public;
}
}
@ -27,155 +27,130 @@ class ProducerTraitTest extends TestCase {
}
public function testEmit() {
$invoked = false;
$value = 1;
Loop::run(function () {
$value = 1;
$callback = function ($emitted) use (&$invoked, $value) {
$invoked = true;
$this->assertSame($emitted, $value);
};
$promise = $this->producer->emit($value);
$this->producer->onEmit($callback);
$promise = $this->producer->emit($value);
$this->assertTrue(yield $this->producer->advance());
$this->assertSame($value, $this->producer->getCurrent());
$this->assertInstanceOf(Promise::class, $promise);
$this->assertTrue($invoked);
$this->assertInstanceOf(Promise::class, $promise);
});
}
/**
* @depends testEmit
*/
public function testEmitSuccessfulPromise() {
$invoked = false;
$value = 1;
$promise = new Success($value);
Loop::run(function () {
$value = 1;
$promise = new Success($value);
$callback = function ($emitted) use (&$invoked, $value) {
$invoked = true;
$this->assertSame($emitted, $value);
};
$promise = $this->producer->emit($promise);
$this->producer->onEmit($callback);
$this->producer->emit($promise);
$this->assertTrue(yield $this->producer->advance());
$this->assertSame($value, $this->producer->getCurrent());
$this->assertTrue($invoked);
$this->assertInstanceOf(Promise::class, $promise);
});
}
/**
* @depends testEmit
*/
public function testEmitFailedPromise() {
$invoked = false;
$exception = new \Exception;
$promise = new Failure($exception);
Loop::run(function () {
$exception = new \Exception;
$promise = new Failure($exception);
$callback = function ($emitted) use (&$invoked) {
$invoked = true;
};
$promise = $this->producer->emit($promise);
$this->producer->onEmit($callback);
$this->producer->emit($promise);
try {
$this->assertTrue(yield $this->producer->advance());
} catch (\Exception $reason) {
$this->assertSame($reason, $exception);
}
$this->assertFalse($invoked);
$this->producer->onResolve(function ($exception) use (&$invoked, &$reason) {
$invoked = true;
$reason = $exception;
$this->assertInstanceOf(Promise::class, $promise);
});
$this->assertTrue($invoked);
$this->assertSame($exception, $reason);
}
/**
* @depends testEmit
*/
public function testEmitPendingPromise() {
$invoked = false;
$value = 1;
$deferred = new Deferred;
Loop::run(function () {
$value = 1;
$deferred = new Deferred;
$callback = function ($emitted) use (&$invoked, $value) {
$invoked = true;
$this->assertSame($emitted, $value);
};
$this->producer->emit($deferred->promise());
$this->producer->onEmit($callback);
$this->producer->emit($deferred->promise());
$deferred->resolve($value);
$this->assertFalse($invoked);
$deferred->resolve($value);
$this->assertTrue($invoked);
$this->assertTrue(yield $this->producer->advance());
$this->assertSame($value, $this->producer->getCurrent());
});
}
/**
* @depends testEmit
*/
public function testEmitSuccessfulReactPromise() {
$invoked = false;
$value = 1;
$promise = new FulfilledReactPromise($value);
Loop::run(function () {
$value = 1;
$promise = new FulfilledReactPromise($value);
$callback = function ($emitted) use (&$invoked, $value) {
$invoked = true;
$this->assertSame($emitted, $value);
};
$this->producer->emit($promise);
$this->producer->onEmit($callback);
$this->producer->emit($promise);
$this->assertTrue($invoked);
$this->assertTrue(yield $this->producer->advance());
$this->assertSame($value, $this->producer->getCurrent());
});
}
/**
* @depends testEmit
*/
public function testEmitPendingPromiseThenNonPromise() {
$invoked = false;
$deferred = new Deferred;
Loop::run(function () {
$deferred = new Deferred;
$callback = function ($emitted) use (&$invoked, &$result) {
$invoked = true;
$result = $emitted;
};
$this->producer->emit($deferred->promise());
$this->producer->onEmit($callback);
$this->producer->emit($deferred->promise());
$this->producer->emit(2);
$this->assertFalse($invoked);
$this->assertTrue(yield $this->producer->advance());
$this->assertSame(2, $this->producer->getCurrent());
$this->producer->emit(2);
$this->assertTrue($invoked);
$this->assertSame(2, $result);
$deferred->resolve(1);
$deferred->resolve(1);
$this->assertSame(1, $result);
$this->assertTrue(yield $this->producer->advance());
$this->assertSame(1, $this->producer->getCurrent());
});
}
/**
* @depends testEmit
* @expectedException \Error
* @expectedExceptionMessage Streams cannot emit values after calling resolve
* @expectedExceptionMessage Streams cannot emit values after calling complete
*/
public function testEmitAfterResolve() {
$this->producer->resolve();
public function testEmitAfterComplete() {
$this->producer->complete();
$this->producer->emit(1);
}
/**
* @depends testEmit
* @expectedException \Error
* @expectedExceptionMessage The stream was resolved before the promise result could be emitted
* @expectedExceptionMessage The stream was completed before the promise result could be emitted
*/
public function testEmitPendingPromiseThenResolve() {
public function testEmitPendingPromiseThenComplete() {
$invoked = false;
$deferred = new Deferred;
$promise = $this->producer->emit($deferred->promise());
$this->producer->resolve();
$this->producer->complete();
$deferred->resolve();
$promise->onResolve(function ($exception) use (&$invoked, &$reason) {
@ -190,7 +165,7 @@ class ProducerTraitTest extends TestCase {
/**
* @depends testEmit
* @expectedException \Error
* @expectedExceptionMessage The stream was resolved before the promise result could be emitted
* @expectedExceptionMessage The stream was completed before the promise result could be emitted
*/
public function testEmitPendingPromiseThenFail() {
$invoked = false;
@ -198,7 +173,7 @@ class ProducerTraitTest extends TestCase {
$promise = $this->producer->emit($deferred->promise());
$this->producer->resolve();
$this->producer->complete();
$deferred->fail(new \Exception);
$promise->onResolve(function ($exception) use (&$invoked, &$reason) {
@ -209,72 +184,4 @@ class ProducerTraitTest extends TestCase {
$this->assertTrue($invoked);
throw $reason;
}
public function testSubscriberThrows() {
$exception = new \Exception;
try {
Loop::run(function () use ($exception) {
$this->producer->onEmit(function () use ($exception) {
throw $exception;
});
$this->producer->emit(1);
});
} catch (\Exception $caught) {
$this->assertSame($exception, $caught);
}
}
public function testSubscriberReturnsSuccessfulPromise() {
$invoked = false;
$value = 1;
$promise = new Success($value);
$this->producer->onEmit(function () use ($promise) {
return $promise;
});
$promise = $this->producer->emit(1);
$promise->onResolve(function () use (&$invoked) {
$invoked = true;
});
$this->assertTrue($invoked);
}
public function testSubscriberReturnsFailedPromise() {
$exception = new \Exception;
$promise = new Failure($exception);
try {
Loop::run(function () use ($exception, $promise) {
$this->producer->onEmit(function () use ($promise) {
return $promise;
});
$promise = $this->producer->emit(1);
$promise->onResolve(function () use (&$invoked) {
$invoked = true;
});
$this->assertTrue($invoked);
});
} catch (\Exception $caught) {
$this->assertSame($exception, $caught);
}
}
public function testSubscriberReturnsGenerator() {
$invoked = false;
$this->producer->onEmit(function ($value) use (&$invoked) {
$invoked = true;
return $value;
yield; // Unreachable, but makes function a generator.
});
$this->producer->emit(1);
$this->assertTrue($invoked);
}
}

View File

@ -13,70 +13,62 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase {
const TIMEOUT = 10;
public function testSuccessfulPromises() {
$results = [];
Loop::run(function () use (&$results) {
Loop::run(function () {
$expected = \range(1, 3);
$stream = Stream\fromIterable([new Success(1), new Success(2), new Success(3)]);
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
while (yield $stream->advance()) {
$this->assertSame(\array_shift($expected), $stream->getCurrent());
}
});
$this->assertSame([1, 2, 3], $results);
}
public function testFailedPromises() {
$exception = new \Exception;
Loop::run(function () use (&$reason, $exception) {
Loop::run(function () {
$exception = new \Exception;
$stream = Stream\fromIterable([new Failure($exception), new Failure($exception)]);
$callback = function ($exception, $value) use (&$reason) {
$reason = $exception;
};
$stream->onResolve($callback);
try {
yield $stream->advance();
} catch (\Exception $reason) {
$this->assertSame($exception, $reason);
}
});
$this->assertSame($exception, $reason);
}
public function testMixedPromises() {
$exception = new \Exception;
$results = [];
Loop::run(function () use (&$results, &$reason, $exception) {
Loop::run(function () {
$exception = new \Exception;
$expected = \range(1, 2);
$stream = Stream\fromIterable([new Success(1), new Success(2), new Failure($exception), new Success(4)]);
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
try {
while (yield $stream->advance()) {
$this->assertSame(\array_shift($expected), $stream->getCurrent());
}
} catch (\Exception $reason) {
$this->assertSame($exception, $reason);
}
$callback = function ($exception, $value) use (&$reason) {
$reason = $exception;
};
$stream->onResolve($callback);
$this->assertEmpty($expected);
});
$this->assertSame(\range(1, 2), $results);
$this->assertSame($exception, $reason);
}
public function testPendingPromises() {
$results = [];
Loop::run(function () use (&$results) {
Loop::run(function () {
$expected = \range(1, 4);
$stream = Stream\fromIterable([new Delayed(30, 1), new Delayed(10, 2), new Delayed(20, 3), new Success(4)]);
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
while (yield $stream->advance()) {
$this->assertSame(\array_shift($expected), $stream->getCurrent());
}
});
$this->assertSame(\range(1, 4), $results);
}
public function testTraversable() {
$results = [];
Loop::run(function () use (&$results) {
Loop::run(function () {
$expected = \range(1, 4);
$generator = (function () {
foreach (\range(1, 4) as $value) {
yield $value;
@ -85,12 +77,12 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase {
$stream = Stream\fromIterable($generator);
$stream->onEmit(function ($value) use (&$results) {
$results[] = $value;
});
});
while (yield $stream->advance()) {
$this->assertSame(\array_shift($expected), $stream->getCurrent());
}
$this->assertSame(\range(1, 4), $results);
$this->assertEmpty($expected);
});
}
/**
@ -113,34 +105,32 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase {
}
public function testInterval() {
$count = 3;
$stream = Stream\fromIterable(range(1, $count), self::TIMEOUT);
Loop::run(function () {
$count = 3;
$stream = Stream\fromIterable(range(1, $count), self::TIMEOUT);
$i = 0;
$stream = Stream\map($stream, function ($value) use (&$i) {
$this->assertSame(++$i, $value);
$i = 0;
while (yield $stream->advance()) {
$this->assertSame(++$i, $stream->getCurrent());
}
$this->assertSame($count, $i);
});
Promise\wait($stream);
$this->assertSame($count, $i);
}
/**
* @depends testInterval
*/
public function testSlowConsumer() {
$invoked = 0;
$count = 5;
Loop::run(function () use (&$invoked, $count) {
Loop::run(function () use ($count) {
$stream = Stream\fromIterable(range(1, $count), self::TIMEOUT);
$stream->onEmit(function () use (&$invoked) {
++$invoked;
return new Delayed(self::TIMEOUT * 2);
});
});
for ($i = 0; yield $stream->advance(); ++$i) {
yield new Delayed(self::TIMEOUT * 2);
}
$this->assertSame($count, $invoked);
$this->assertSame($count, $i);
});
}
}

View File

@ -1,246 +0,0 @@
<?php
namespace Amp\Test;
use Amp\Delayed;
use Amp\Emitter;
use Amp\Loop;
use Amp\Producer;
use Amp\StreamIterator;
use PHPUnit\Framework\TestCase;
class StreamIteratorTest extends TestCase {
const TIMEOUT = 10;
public function testSingleEmittingStream() {
Loop::run(function () {
$value = 1;
$stream = new Producer(function (callable $emit) use ($value) {
yield $emit($value);
return $value;
});
$streamIterator = new StreamIterator($stream);
while (yield $streamIterator->advance()) {
$this->assertSame($streamIterator->getCurrent(), $value);
}
$this->assertSame($streamIterator->getResult(), $value);
});
}
/**
* @depends testSingleEmittingStream
*/
public function testFastEmittingStream() {
Loop::run(function () {
$count = 10;
$emitter = new Emitter;
$streamIterator = new StreamIterator($emitter->stream());
for ($i = 0; $i < $count; ++$i) {
$promises[] = $emitter->emit($i);
}
$emitter->resolve($i);
for ($i = 0; yield $streamIterator->advance(); ++$i) {
$this->assertSame($streamIterator->getCurrent(), $i);
}
$this->assertSame($count, $i);
$this->assertSame($streamIterator->getResult(), $i);
});
}
/**
* @depends testSingleEmittingStream
*/
public function testSlowEmittingStream() {
Loop::run(function () {
$count = 10;
$stream = new Producer(function (callable $emit) use ($count) {
for ($i = 0; $i < $count; ++$i) {
yield new Delayed(self::TIMEOUT);
yield $emit($i);
}
return $i;
});
$streamIterator = new StreamIterator($stream);
for ($i = 0; yield $streamIterator->advance(); ++$i) {
$this->assertSame($streamIterator->getCurrent(), $i);
}
$this->assertSame($count, $i);
$this->assertSame($streamIterator->getResult(), $i);
});
}
/**
* @depends testFastEmittingStream
*/
public function testDrain() {
Loop::run(function () {
$count = 10;
$expected = \range(0, $count - 1);
$emitter = new Emitter;
$streamIterator = new StreamIterator($emitter->stream());
for ($i = 0; $i < $count; ++$i) {
$promises[] = $emitter->emit($i);
}
$value = null;
if (yield $streamIterator->advance()) {
$value = $streamIterator->getCurrent();
}
$this->assertSame(reset($expected), $value);
unset($expected[0]);
$emitter->resolve($i);
$values = $streamIterator->drain();
$this->assertSame($expected, $values);
});
}
/**
* @expectedException \Error
* @expectedExceptionMessage The stream has not resolved
*/
public function testDrainBeforeResolution() {
$emitter = new Emitter;
$streamIterator = new StreamIterator($emitter->stream());
$streamIterator->drain();
}
public function testFailingStream() {
Loop::run(function () {
$exception = new \Exception;
$emitter = new Emitter;
$streamIterator = new StreamIterator($emitter->stream());
$emitter->fail($exception);
try {
while (yield $streamIterator->advance());
$this->fail("StreamIterator::advance() should throw stream failure reason");
} catch (\Exception $reason) {
$this->assertSame($exception, $reason);
}
try {
$streamIterator->getResult();
$this->fail("StreamIterator::getResult() should throw stream failure reason");
} catch (\Exception $reason) {
$this->assertSame($exception, $reason);
}
});
}
/**
* @expectedException \Error
* @expectedExceptionMessage Promise returned from advance() must resolve before calling this method
*/
public function testGetCurrentBeforeAdvanceResolves() {
$streamIterator = new StreamIterator((new Emitter)->stream());
$streamIterator->advance();
$streamIterator->getCurrent();
}
/**
* @expectedException \Error
* @expectedExceptionMessage The stream has resolved
*/
public function testGetCurrentAfterResolution() {
$emitter = new Emitter;
$streamIterator = new StreamIterator($emitter->stream());
$emitter->resolve();
$streamIterator->getCurrent();
}
/**
* @expectedException \Error
* @expectedExceptionMessage The stream has not resolved
*/
public function testGetResultBeforeResolution() {
Loop::run(function () {
$streamIterator = new StreamIterator((new Emitter)->stream());
$streamIterator->getResult();
});
}
/**
* @expectedException \Error
* @expectedExceptionMessage The prior promise returned must resolve before invoking this method again
*/
public function testConsecutiveAdvanceCalls() {
$emitter = new Emitter;
$streamIterator = new StreamIterator($emitter->stream());
$streamIterator->advance();
$streamIterator->advance();
}
public function testStreamIteratorDestroyedAfterEmits() {
$emitter = new Emitter;
$streamIterator = new StreamIterator($emitter->stream());
$promise = $emitter->emit(1);
unset($streamIterator);
$invoked = false;
$promise->onResolve(function () use (&$invoked) {
$invoked = true;
});
$this->assertTrue($invoked);
}
public function testStreamIteratorDestroyedThenStreamEmits() {
$emitter = new Emitter;
$streamIterator = new StreamIterator($emitter->stream());
$emitter->emit(1);
unset($streamIterator);
$promise = $emitter->emit(2);
$invoked = false;
$promise->onResolve(function () use (&$invoked) {
$invoked = true;
});
$this->assertTrue($invoked);
}
public function testStreamFailsWhenStreamIteratorWaiting() {
$exception = new \Exception;
$emitter = new Emitter;
$streamIterator = new StreamIterator($emitter->stream());
$promise = $streamIterator->advance();
$promise->onResolve(function ($exception, $value) use (&$reason) {
$reason = $exception;
});
$emitter->fail($exception);
$this->assertSame($exception, $reason);
}
}