continue()) { $this->assertSame(\array_shift($expected), $value); } } /** * @depends testMerge */ public function testMergeWithDelayedYields(): void { $pipelines = []; $values1 = [new Delayed(10, 1), new Delayed(50, 2), new Delayed(70, 3)]; $values2 = [new Delayed(20, 4), new Delayed(40, 5), new Delayed(60, 6)]; $expected = [1, 4, 5, 2, 6, 3]; $pipelines[] = new AsyncGenerator(function () use ($values1) { foreach ($values1 as $value) { yield await($value); } }); $pipelines[] = new AsyncGenerator(function () use ($values2) { foreach ($values2 as $value) { yield await($value); } }); $pipeline = Pipeline\merge($pipelines); while (null !== $value = $pipeline->continue()) { $this->assertSame(\array_shift($expected), $value); } } /** * @depends testMerge */ public function testMergeWithFailedPipeline(): void { $exception = new TestException; $generator = new AsyncGenerator(static function () use ($exception) { yield 1; // Emit once before failing. throw $exception; }); $pipeline = Pipeline\merge([$generator, $unused = Pipeline\fromIterable(\range(1, 5))]); try { await(Pipeline\discard($pipeline)); $this->fail("The exception used to fail the pipeline should be thrown from continue()"); } catch (TestException $reason) { $this->assertSame($exception, $reason); } finally { await(Pipeline\discard($unused)); } } public function testNonPipeline(): void { $this->expectException(\TypeError::class); /** @noinspection PhpParamsInspection */ Pipeline\merge([1]); } }