1
0
mirror of https://github.com/danog/parallel.git synced 2024-12-03 10:07:49 +01:00
parallel/test/Sync/ChannelledStreamTest.php

152 lines
4.2 KiB
PHP
Raw Normal View History

2016-12-30 02:16:04 +01:00
<?php
2015-08-03 07:20:06 +02:00
2016-08-23 23:47:40 +02:00
namespace Amp\Parallel\Test\Sync;
2016-08-18 18:04:48 +02:00
2017-03-24 01:25:35 +01:00
use Amp\ByteStream\DuplexStream;
2017-03-25 07:19:46 +01:00
use Amp\ByteStream\ReadableStream;
use Amp\ByteStream\StreamException;
2017-03-25 07:19:46 +01:00
use Amp\ByteStream\WritableStream;
use Amp\Loop;
2016-08-23 23:47:40 +02:00
use Amp\Parallel\Sync\ChannelledStream;
use Amp\PHPUnit\TestCase;
2016-08-19 00:36:58 +02:00
use Amp\Success;
class ChannelledStreamTest extends TestCase {
2015-09-27 18:15:47 +02:00
/**
2017-03-24 01:25:35 +01:00
* @return \Amp\ByteStream\DuplexStream|\PHPUnit_Framework_MockObject_MockObject
2015-09-27 18:15:47 +02:00
*/
2016-08-19 00:36:58 +02:00
protected function createMockStream() {
2017-03-24 01:25:35 +01:00
$mock = $this->createMock(DuplexStream::class);
2015-09-27 18:15:47 +02:00
$buffer = '';
$mock->method('write')
->will($this->returnCallback(function ($data) use (&$buffer) {
$buffer .= $data;
2016-08-19 00:36:58 +02:00
return new Success(\strlen($data));
2015-09-27 18:15:47 +02:00
}));
$mock->method('advance')
->willReturn(new Success(true));
$mock->method('getChunk')
->will($this->returnCallback(function () use (&$buffer) {
$result = $buffer;
$buffer = '';
return $result;
2015-09-27 18:15:47 +02:00
}));
return $mock;
}
2016-08-19 00:36:58 +02:00
public function testSendReceive() {
Loop::run(function () {
2015-09-27 18:15:47 +02:00
$mock = $this->createMockStream();
2017-03-25 07:19:46 +01:00
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
2015-08-03 07:20:06 +02:00
2015-09-27 18:15:47 +02:00
$message = 'hello';
2016-08-19 00:36:58 +02:00
yield $a->send($message);
$data = yield $b->receive();
2015-09-27 18:15:47 +02:00
$this->assertSame($message, $data);
});
2015-09-27 18:15:47 +02:00
}
/**
* @depends testSendReceive
*/
2016-08-19 00:36:58 +02:00
public function testSendReceiveLongData() {
Loop::run(function () {
2015-09-27 18:15:47 +02:00
$mock = $this->createMockStream();
2017-03-25 07:19:46 +01:00
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
2015-09-27 18:15:47 +02:00
$length = 0xffff;
$message = '';
for ($i = 0; $i < $length; ++$i) {
$message .= chr(mt_rand(0, 255));
}
2016-08-19 00:36:58 +02:00
yield $a->send($message);
$data = yield $b->receive();
2015-09-27 18:15:47 +02:00
$this->assertSame($message, $data);
});
2015-08-03 07:20:06 +02:00
}
2015-09-03 01:29:48 +02:00
/**
* @depends testSendReceive
2016-08-23 23:47:40 +02:00
* @expectedException \Amp\Parallel\ChannelException
2015-09-03 01:29:48 +02:00
*/
2016-08-19 00:36:58 +02:00
public function testInvalidDataReceived() {
Loop::run(function () {
2015-09-27 18:15:47 +02:00
$mock = $this->createMockStream();
2017-03-25 07:19:46 +01:00
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
2015-09-03 01:29:48 +02:00
// Close $a. $b should close on next read...
2016-08-19 00:36:58 +02:00
yield $mock->write(pack('L', 10) . '1234567890');
$data = yield $b->receive();
});
2015-09-03 01:29:48 +02:00
}
/**
* @depends testSendReceive
2016-08-23 23:47:40 +02:00
* @expectedException \Amp\Parallel\ChannelException
2015-09-03 01:29:48 +02:00
*/
2016-08-19 00:36:58 +02:00
public function testSendUnserializableData() {
Loop::run(function () {
2015-09-27 18:15:47 +02:00
$mock = $this->createMockStream();
2017-03-25 07:19:46 +01:00
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
2015-09-03 01:29:48 +02:00
// Close $a. $b should close on next read...
2016-08-19 00:36:58 +02:00
yield $a->send(function () {});
$data = yield $b->receive();
});
2015-09-03 01:29:48 +02:00
}
/**
* @depends testSendReceive
2016-08-23 23:47:40 +02:00
* @expectedException \Amp\Parallel\ChannelException
2015-09-03 01:29:48 +02:00
*/
2016-08-19 00:36:58 +02:00
public function testSendAfterClose() {
Loop::run(function () {
2017-03-24 01:25:35 +01:00
$mock = $this->createMock(DuplexStream::class);
2015-09-27 18:15:47 +02:00
$mock->expects($this->once())
->method('write')
->will($this->throwException(new StreamException));
2015-09-03 01:29:48 +02:00
2017-03-25 07:19:46 +01:00
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream(
$this->createMock(ReadableStream::class),
$this->createMock(WritableStream::class)
);
2015-09-03 01:29:48 +02:00
2016-08-19 00:36:58 +02:00
yield $a->send('hello');
});
2015-09-03 01:29:48 +02:00
}
/**
* @depends testSendReceive
2016-08-23 23:47:40 +02:00
* @expectedException \Amp\Parallel\ChannelException
2015-09-03 01:29:48 +02:00
*/
2016-08-19 00:36:58 +02:00
public function testReceiveAfterClose() {
Loop::run(function () {
2017-03-24 01:25:35 +01:00
$mock = $this->createMock(DuplexStream::class);
2015-09-27 18:15:47 +02:00
$mock->expects($this->once())
->method('advance')
->willReturn(new Success(false));
2015-09-03 01:29:48 +02:00
2017-03-25 07:19:46 +01:00
$a = new ChannelledStream($mock, $mock);
2015-09-03 01:29:48 +02:00
2016-08-19 00:36:58 +02:00
$data = yield $a->receive();
});
2015-09-03 01:29:48 +02:00
}
2015-08-03 07:20:06 +02:00
}