2016-12-30 02:16:04 +01:00
|
|
|
<?php
|
2015-08-03 07:20:06 +02:00
|
|
|
|
2016-08-23 23:47:40 +02:00
|
|
|
namespace Amp\Parallel\Test\Sync;
|
2016-08-18 18:04:48 +02:00
|
|
|
|
2017-05-10 09:05:35 +02:00
|
|
|
use Amp\ByteStream\InputStream;
|
|
|
|
use Amp\ByteStream\OutputStream;
|
2017-05-18 09:51:31 +02:00
|
|
|
use Amp\ByteStream\StreamException;
|
2017-03-16 23:03:59 +01:00
|
|
|
use Amp\Loop;
|
2016-08-23 23:47:40 +02:00
|
|
|
use Amp\Parallel\Sync\ChannelledStream;
|
2017-03-22 05:19:15 +01:00
|
|
|
use Amp\PHPUnit\TestCase;
|
2017-05-10 09:05:35 +02:00
|
|
|
use Amp\Promise;
|
2016-08-19 00:36:58 +02:00
|
|
|
use Amp\Success;
|
|
|
|
|
2018-10-07 16:50:45 +02:00
|
|
|
class ChannelledStreamTest extends TestCase
|
|
|
|
{
|
2015-09-27 18:15:47 +02:00
|
|
|
/**
|
2017-05-10 09:05:35 +02:00
|
|
|
* @return \Amp\ByteStream\InputStream|\Amp\ByteStream\OutputStream
|
2015-09-27 18:15:47 +02:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
protected function createMockStream()
|
|
|
|
{
|
2017-05-10 09:05:35 +02:00
|
|
|
return new class implements InputStream, OutputStream {
|
|
|
|
private $buffer = "";
|
2015-09-27 18:15:47 +02:00
|
|
|
|
2018-10-07 16:50:45 +02:00
|
|
|
public function read(): Promise
|
|
|
|
{
|
2017-05-10 09:05:35 +02:00
|
|
|
$data = $this->buffer;
|
|
|
|
$this->buffer = "";
|
|
|
|
return new Success($data);
|
|
|
|
}
|
2015-09-27 18:15:47 +02:00
|
|
|
|
2018-10-07 16:50:45 +02:00
|
|
|
public function write(string $data): Promise
|
|
|
|
{
|
2017-05-10 09:05:35 +02:00
|
|
|
$this->buffer .= $data;
|
2016-08-19 00:36:58 +02:00
|
|
|
return new Success(\strlen($data));
|
2017-05-10 09:05:35 +02:00
|
|
|
}
|
2017-04-16 17:12:42 +02:00
|
|
|
|
2018-10-07 16:50:45 +02:00
|
|
|
public function end(string $finalData = ""): Promise
|
|
|
|
{
|
2017-05-10 09:05:35 +02:00
|
|
|
throw new \BadMethodCallException;
|
|
|
|
}
|
2015-09-27 18:15:47 +02:00
|
|
|
|
2018-10-07 16:50:45 +02:00
|
|
|
public function close()
|
|
|
|
{
|
2017-05-10 09:05:35 +02:00
|
|
|
throw new \BadMethodCallException;
|
|
|
|
}
|
|
|
|
};
|
2015-09-27 18:15:47 +02:00
|
|
|
}
|
2017-05-10 09:05:35 +02:00
|
|
|
|
2018-10-07 16:50:45 +02:00
|
|
|
public function testSendReceive()
|
|
|
|
{
|
2017-03-16 23:03:59 +01:00
|
|
|
Loop::run(function () {
|
2015-09-27 18:15:47 +02:00
|
|
|
$mock = $this->createMockStream();
|
2017-03-25 07:19:46 +01:00
|
|
|
$a = new ChannelledStream($mock, $mock);
|
|
|
|
$b = new ChannelledStream($mock, $mock);
|
2015-08-03 07:20:06 +02:00
|
|
|
|
2015-09-27 18:15:47 +02:00
|
|
|
$message = 'hello';
|
|
|
|
|
2016-08-19 00:36:58 +02:00
|
|
|
yield $a->send($message);
|
|
|
|
$data = yield $b->receive();
|
2015-09-27 18:15:47 +02:00
|
|
|
$this->assertSame($message, $data);
|
2017-03-16 23:03:59 +01:00
|
|
|
});
|
2015-09-27 18:15:47 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @depends testSendReceive
|
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function testSendReceiveLongData()
|
|
|
|
{
|
2017-03-16 23:03:59 +01:00
|
|
|
Loop::run(function () {
|
2015-09-27 18:15:47 +02:00
|
|
|
$mock = $this->createMockStream();
|
2017-03-25 07:19:46 +01:00
|
|
|
$a = new ChannelledStream($mock, $mock);
|
|
|
|
$b = new ChannelledStream($mock, $mock);
|
2015-09-27 18:15:47 +02:00
|
|
|
|
|
|
|
$length = 0xffff;
|
|
|
|
$message = '';
|
|
|
|
for ($i = 0; $i < $length; ++$i) {
|
2018-10-07 16:50:45 +02:00
|
|
|
$message .= \chr(\mt_rand(0, 255));
|
2015-09-27 18:15:47 +02:00
|
|
|
}
|
|
|
|
|
2016-08-19 00:36:58 +02:00
|
|
|
yield $a->send($message);
|
|
|
|
$data = yield $b->receive();
|
2015-09-27 18:15:47 +02:00
|
|
|
$this->assertSame($message, $data);
|
2017-03-16 23:03:59 +01:00
|
|
|
});
|
2015-08-03 07:20:06 +02:00
|
|
|
}
|
2015-09-03 01:29:48 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* @depends testSendReceive
|
2017-05-18 06:13:29 +02:00
|
|
|
* @expectedException \Amp\Parallel\Sync\ChannelException
|
2015-09-03 01:29:48 +02:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function testInvalidDataReceived()
|
|
|
|
{
|
2017-03-16 23:03:59 +01:00
|
|
|
Loop::run(function () {
|
2015-09-27 18:15:47 +02:00
|
|
|
$mock = $this->createMockStream();
|
2017-03-25 07:19:46 +01:00
|
|
|
$a = new ChannelledStream($mock, $mock);
|
|
|
|
$b = new ChannelledStream($mock, $mock);
|
2015-09-03 01:29:48 +02:00
|
|
|
|
|
|
|
// Close $a. $b should close on next read...
|
2018-10-07 16:50:45 +02:00
|
|
|
yield $mock->write(\pack('L', 10) . '1234567890');
|
2016-08-19 00:36:58 +02:00
|
|
|
$data = yield $b->receive();
|
2017-03-16 23:03:59 +01:00
|
|
|
});
|
2015-09-03 01:29:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @depends testSendReceive
|
2018-10-24 05:44:02 +02:00
|
|
|
* @expectedException \Amp\Parallel\Sync\SerializationException
|
2015-09-03 01:29:48 +02:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function testSendUnserializableData()
|
|
|
|
{
|
2017-03-16 23:03:59 +01:00
|
|
|
Loop::run(function () {
|
2015-09-27 18:15:47 +02:00
|
|
|
$mock = $this->createMockStream();
|
2017-03-25 07:19:46 +01:00
|
|
|
$a = new ChannelledStream($mock, $mock);
|
|
|
|
$b = new ChannelledStream($mock, $mock);
|
2015-09-03 01:29:48 +02:00
|
|
|
|
|
|
|
// Close $a. $b should close on next read...
|
2016-08-19 00:36:58 +02:00
|
|
|
yield $a->send(function () {});
|
|
|
|
$data = yield $b->receive();
|
2017-03-16 23:03:59 +01:00
|
|
|
});
|
2015-09-03 01:29:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @depends testSendReceive
|
2017-05-18 06:13:29 +02:00
|
|
|
* @expectedException \Amp\Parallel\Sync\ChannelException
|
2015-09-03 01:29:48 +02:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function testSendAfterClose()
|
|
|
|
{
|
2017-03-16 23:03:59 +01:00
|
|
|
Loop::run(function () {
|
2017-05-10 09:05:35 +02:00
|
|
|
$mock = $this->createMock(OutputStream::class);
|
2015-09-27 18:15:47 +02:00
|
|
|
$mock->expects($this->once())
|
|
|
|
->method('write')
|
2017-04-16 17:12:42 +02:00
|
|
|
->will($this->throwException(new StreamException));
|
2015-09-03 01:29:48 +02:00
|
|
|
|
2017-05-10 09:05:35 +02:00
|
|
|
$a = new ChannelledStream($this->createMock(InputStream::class), $mock);
|
2017-03-25 07:19:46 +01:00
|
|
|
$b = new ChannelledStream(
|
2017-05-10 09:05:35 +02:00
|
|
|
$this->createMock(InputStream::class),
|
|
|
|
$this->createMock(OutputStream::class)
|
2017-03-25 07:19:46 +01:00
|
|
|
);
|
2015-09-03 01:29:48 +02:00
|
|
|
|
2016-08-19 00:36:58 +02:00
|
|
|
yield $a->send('hello');
|
2017-03-16 23:03:59 +01:00
|
|
|
});
|
2015-09-03 01:29:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @depends testSendReceive
|
2017-05-18 06:13:29 +02:00
|
|
|
* @expectedException \Amp\Parallel\Sync\ChannelException
|
2015-09-03 01:29:48 +02:00
|
|
|
*/
|
2018-10-07 16:50:45 +02:00
|
|
|
public function testReceiveAfterClose()
|
|
|
|
{
|
2017-03-16 23:03:59 +01:00
|
|
|
Loop::run(function () {
|
2017-05-10 09:05:35 +02:00
|
|
|
$mock = $this->createMock(InputStream::class);
|
2015-09-27 18:15:47 +02:00
|
|
|
$mock->expects($this->once())
|
2017-05-10 09:05:35 +02:00
|
|
|
->method('read')
|
|
|
|
->willReturn(new Success(null));
|
2015-09-03 01:29:48 +02:00
|
|
|
|
2017-05-10 09:05:35 +02:00
|
|
|
$a = new ChannelledStream($mock, $this->createMock(OutputStream::class));
|
2015-09-03 01:29:48 +02:00
|
|
|
|
2016-08-19 00:36:58 +02:00
|
|
|
$data = yield $a->receive();
|
2017-03-16 23:03:59 +01:00
|
|
|
});
|
2015-09-03 01:29:48 +02:00
|
|
|
}
|
2015-08-03 07:20:06 +02:00
|
|
|
}
|