mirror of
https://github.com/danog/parallel.git
synced 2024-11-30 04:39:01 +01:00
Fix bug reading long data from channel
This commit is contained in:
parent
69d85ea8e2
commit
8625e5968e
@ -64,10 +64,6 @@ class Channel implements ChannelInterface
|
||||
*/
|
||||
public function send($data)
|
||||
{
|
||||
if (!$this->write->isWritable()) {
|
||||
throw new ChannelException('The channel was unexpectedly closed. Did the context die?');
|
||||
}
|
||||
|
||||
// Serialize the data to send into the channel.
|
||||
try {
|
||||
$serialized = serialize($data);
|
||||
@ -96,18 +92,20 @@ class Channel implements ChannelInterface
|
||||
// Read the message length first to determine how much needs to be read from the stream.
|
||||
$length = self::HEADER_LENGTH;
|
||||
$buffer = '';
|
||||
$remaining = $length;
|
||||
|
||||
try {
|
||||
do {
|
||||
$buffer .= (yield $this->read->read($length));
|
||||
} while (($length -= strlen($buffer)) > 0);
|
||||
$buffer .= (yield $this->read->read($remaining));
|
||||
} while ($remaining = $length - strlen($buffer));
|
||||
|
||||
list(, $length) = unpack('L', $buffer);
|
||||
$buffer = '';
|
||||
$remaining = $length;
|
||||
|
||||
do {
|
||||
$buffer .= (yield $this->read->read($length));
|
||||
} while (($length -= strlen($buffer)) > 0);
|
||||
$buffer .= (yield $this->read->read($remaining));
|
||||
} while ($remaining = $length - strlen($buffer));
|
||||
} catch (StreamException $exception) {
|
||||
throw new ChannelException('Reading from the channel failed. Did the context die?', 0, $exception);
|
||||
}
|
||||
|
@ -5,14 +5,39 @@ use Icicle\Concurrent\Sync\Channel;
|
||||
use Icicle\Coroutine;
|
||||
use Icicle\Loop;
|
||||
use Icicle\Socket;
|
||||
use Icicle\Socket\Stream\DuplexStream;
|
||||
use Icicle\Stream\DuplexStreamInterface;
|
||||
use Icicle\Stream\Exception\UnreadableException;
|
||||
use Icicle\Stream\Exception\UnwritableException;
|
||||
use Icicle\Stream\ReadableStreamInterface;
|
||||
use Icicle\Stream\WritableStreamInterface;
|
||||
use Icicle\Tests\Concurrent\TestCase;
|
||||
|
||||
class ChannelTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @return \Icicle\Stream\DuplexStreamInterface|\PHPUnit_Framework_MockObject_MockObject
|
||||
*/
|
||||
protected function createMockStream()
|
||||
{
|
||||
$mock = $this->getMock(DuplexStreamInterface::class);
|
||||
|
||||
$buffer = '';
|
||||
|
||||
$mock->method('write')
|
||||
->will($this->returnCallback(function ($data) use (&$buffer) {
|
||||
$buffer .= $data;
|
||||
}));
|
||||
|
||||
$mock->method('read')
|
||||
->will($this->returnCallback(function ($length, $byte = null, $timeout = 0) use (&$buffer) {
|
||||
$result = substr($buffer, 0, $length);
|
||||
$buffer = substr($buffer, $length);
|
||||
return $result;
|
||||
}));
|
||||
|
||||
return $mock;
|
||||
}
|
||||
|
||||
public function testIsOpen()
|
||||
{
|
||||
$mock = $this->getMock(DuplexStreamInterface::class);
|
||||
@ -75,13 +100,42 @@ class ChannelTest extends TestCase
|
||||
public function testSendReceive()
|
||||
{
|
||||
Coroutine\create(function () {
|
||||
list($a, $b) = Socket\pair();
|
||||
$a = new Channel(new DuplexStream($a));
|
||||
$b = new Channel(new DuplexStream($b));
|
||||
$mock = $this->createMockStream();
|
||||
$a = new Channel($mock);
|
||||
$b = new Channel($mock);
|
||||
|
||||
yield $a->send('hello');
|
||||
$message = 'hello';
|
||||
|
||||
yield $a->send($message);
|
||||
$data = (yield $b->receive());
|
||||
$this->assertEquals('hello', $data);
|
||||
$this->assertSame($message, $data);
|
||||
|
||||
$a->close();
|
||||
$b->close();
|
||||
})->done();
|
||||
|
||||
Loop\run();
|
||||
}
|
||||
|
||||
/**
|
||||
* @depends testSendReceive
|
||||
*/
|
||||
public function testSendReceiveLongData()
|
||||
{
|
||||
Coroutine\create(function () {
|
||||
$mock = $this->createMockStream();
|
||||
$a = new Channel($mock);
|
||||
$b = new Channel($mock);
|
||||
|
||||
$length = 0xffff;
|
||||
$message = '';
|
||||
for ($i = 0; $i < $length; ++$i) {
|
||||
$message .= chr(mt_rand(0, 255));
|
||||
}
|
||||
|
||||
yield $a->send($message);
|
||||
$data = (yield $b->receive());
|
||||
$this->assertSame($message, $data);
|
||||
|
||||
$a->close();
|
||||
$b->close();
|
||||
@ -97,12 +151,12 @@ class ChannelTest extends TestCase
|
||||
public function testInvalidDataReceived()
|
||||
{
|
||||
Coroutine\create(function () {
|
||||
list($a, $b) = Socket\pair();
|
||||
$a = new Channel($stream = new DuplexStream($a));
|
||||
$b = new Channel(new DuplexStream($b));
|
||||
$mock = $this->createMockStream();
|
||||
$a = new Channel($mock);
|
||||
$b = new Channel($mock);
|
||||
|
||||
// Close $a. $b should close on next read...
|
||||
yield $stream->write(pack('L', 10) . '1234567890');
|
||||
yield $mock->write(pack('L', 10) . '1234567890');
|
||||
$data = (yield $b->receive());
|
||||
})->done();
|
||||
|
||||
@ -116,9 +170,9 @@ class ChannelTest extends TestCase
|
||||
public function testSendUnserializableData()
|
||||
{
|
||||
Coroutine\create(function () {
|
||||
list($a, $b) = Socket\pair();
|
||||
$a = new Channel(new DuplexStream($a));
|
||||
$b = new Channel(new DuplexStream($b));
|
||||
$mock = $this->createMockStream();
|
||||
$a = new Channel($mock);
|
||||
$b = new Channel($mock);
|
||||
|
||||
// Close $a. $b should close on next read...
|
||||
yield $a->send(function () {});
|
||||
@ -135,13 +189,14 @@ class ChannelTest extends TestCase
|
||||
public function testSendAfterClose()
|
||||
{
|
||||
Coroutine\create(function () {
|
||||
list($a, $b) = Socket\pair();
|
||||
$a = new Channel(new DuplexStream($a));
|
||||
$b = new Channel(new DuplexStream($b));
|
||||
$mock = $this->getMock(DuplexStreamInterface::class);
|
||||
$mock->expects($this->once())
|
||||
->method('write')
|
||||
->will($this->throwException(new UnwritableException()));
|
||||
|
||||
$a->close();
|
||||
$a = new Channel($mock);
|
||||
$b = new Channel($this->getMock(DuplexStreamInterface::class));
|
||||
|
||||
// Close $a. $b should close on next read...
|
||||
yield $a->send('hello');
|
||||
})->done();
|
||||
|
||||
@ -155,13 +210,13 @@ class ChannelTest extends TestCase
|
||||
public function testReceiveAfterClose()
|
||||
{
|
||||
Coroutine\create(function () {
|
||||
list($a, $b) = Socket\pair();
|
||||
$a = new Channel(new DuplexStream($a));
|
||||
$b = new Channel(new DuplexStream($b));
|
||||
$mock = $this->getMock(DuplexStreamInterface::class);
|
||||
$mock->expects($this->once())
|
||||
->method('read')
|
||||
->will($this->throwException(new UnreadableException()));
|
||||
|
||||
$a->close();
|
||||
$a = new Channel($mock);
|
||||
|
||||
// Close $a. $b should close on next read...
|
||||
$data = (yield $a->receive());
|
||||
})->done();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user