getStreamPair(); $message = \str_repeat(".", self::LARGE_MESSAGE_SIZE); \Amp\Promise\rethrow($a->end($message)); $received = ""; while (null !== $chunk = yield $b->read()) { $received .= $chunk; } $this->assertSame($message, $received); }); } public function testManySmallPayloads() { Loop::run(function () { list($a, $b) = $this->getStreamPair(); $message = \str_repeat(".", 8192 /* default chunk size */); for ($i = 0; $i < 128; $i++) { \Amp\Promise\rethrow($a->write($message)); } $a->end(); $received = ""; while (null !== $chunk = yield $b->read()) { $received .= $chunk; } $this->assertSame(\str_repeat($message, $i), $received); }); } public function testThrowsOnExternallyShutdownStreamWithLargePayload() { $this->expectException(StreamException::class); Loop::run(function () { try { /* prevent crashes with phpdbg due to SIGPIPE not being handled... */ Loop::onSignal(\defined("SIGPIPE") ? SIGPIPE : 13, function () { }); } catch (Loop\UnsupportedFeatureException $e) { } list($a, $b) = $this->getStreamPair(); $message = \str_repeat(".", self::LARGE_MESSAGE_SIZE); $writePromise = $a->write($message); yield $b->read(); $b->close(); yield $writePromise; }); } public function testThrowsOnExternallyShutdownStreamWithSmallPayloads() { $this->expectException(StreamException::class); Loop::run(function () { try { /* prevent crashes with phpdbg due to SIGPIPE not being handled... */ Loop::onSignal(\defined("SIGPIPE") ? SIGPIPE : 13, function () { }); } catch (Loop\UnsupportedFeatureException $e) { } list($a, $b) = $this->getStreamPair(); $message = \str_repeat(".", 8192 /* default chunk size */); for ($i = 0; $i < 128; $i++) { $lastWritePromise = $a->write($message); } yield $b->read(); $b->close(); yield $lastWritePromise; }); } public function testThrowsOnCloseBeforeWritingComplete() { $this->expectException(ClosedException::class); Loop::run(function () { list($a, $b) = $this->getStreamPair(4096); $message = \str_repeat(".", 8192 /* default chunk size */); $lastWritePromise = $a->end($message); $a->close(); yield $lastWritePromise; }); } public function testThrowsOnStreamNotWritable() { $this->expectException(StreamException::class); Loop::run(function () { list($a, $b) = $this->getStreamPair(); $message = \str_repeat(".", 8192 /* default chunk size */); $a->close(); $lastWritePromise = $a->end($message); yield $lastWritePromise; }); } public function testThrowsOnReferencingClosedStream() { $this->expectException(\Error::class); Loop::run(function () { list($a, $b) = $this->getStreamPair(); $b->close(); $b->reference(); }); } public function testThrowsOnUnreferencingClosedStream() { $this->expectException(\Error::class); Loop::run(function () { list($a, $b) = $this->getStreamPair(); $b->close(); $b->unreference(); }); } public function testThrowsOnPendingRead() { $this->expectException(PendingReadError::class); Loop::run(function () { list($a, $b) = $this->getStreamPair(); $b->read(); $b->read(); }); } public function testResolveSuccessOnClosedStream() { Loop::run(function () { list($a, $b) = $this->getStreamPair(); $b->close(); $this->assertInstanceOf(Success::class, $b->read()); }); } public function testChunkedPayload() { Loop::run(function () { list($a, $b) = $this->getStreamPair(4096); $message = \str_repeat(".", 8192 /* default chunk size */); \Amp\Promise\rethrow($a->end($message)); $received = ""; while (null !== $chunk = yield $b->read()) { $received .= $chunk; } $this->assertSame($message, $received); }); } public function testEmptyPayload() { Loop::run(function () { list($a, $b) = $this->getStreamPair(4096); $message = ""; \Amp\Promise\rethrow($a->end($message)); $received = ""; while (null !== $chunk = yield $b->read()) { $received .= $chunk; } $this->assertSame($message, $received); }); } public function testCloseStreamAfterEndPayload() { Loop::run(function () { list($a, $b) = $this->getStreamPair(); $message = \str_repeat(".", 8192 /* default chunk size */); \Amp\Promise\rethrow($a->end($message)); $received = ""; while (null !== $chunk = yield $b->read()) { $received .= $chunk; } $this->assertSame($message, $received); }); } public function testIssue47() { Loop::run(function () { $middle = \tempnam(\sys_get_temp_dir(), 'byte-stream-middle-'); \Amp\ByteStream\pipe( new ResourceInputStream(\fopen(__FILE__, 'rb')), new ResourceOutputStream(\fopen($middle, 'wb')) ); $middleReadStream = new ResourceInputStream(\fopen($middle, 'rb')); $buffer = ''; yield new Delayed(0); while (\strlen($buffer) < \filesize(__FILE__)) { $buffer .= yield $middleReadStream->read(); } $this->assertStringEqualsFile(__FILE__, $buffer); }); } public function testSetChunkSize() { Loop::run(function () { list($a, $b) = $this->getStreamPair(); $a->setChunkSize(1); $b->setChunkSize(1); $this->assertSame(3, yield $a->write('foo')); $this->assertSame('f', yield $b->read()); $b->setChunkSize(3); $this->assertSame('oo', yield $b->read()); }); } }