1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Fix reading from channel

This commit is contained in:
Aaron Piotrowski 2015-08-05 18:48:20 -05:00
parent 09edc25348
commit b5d97eb9d5

View File

@ -18,6 +18,8 @@ class Channel
const MESSAGE_CLOSE = 1;
const MESSAGE_DATA = 2;
const HEADER_LENGTH = 5;
/**
* @var DuplexStream An asynchronous socket stream.
*/
@ -80,25 +82,28 @@ class Channel
{
// Read the message header first and extract its contents.
$buffer = '';
while (strlen($buffer) < 5) {
$buffer .= (yield $this->getSocket()->read(5));
}
$length = self::HEADER_LENGTH;
do {
$buffer .= (yield $this->getSocket()->read($length));
} while (($length -= strlen($buffer)) > 0);
$header = unpack('Ctype/Llength', $buffer);
// If the message type is MESSAGE_CLOSE, the peer was closed and the channel
// is done.
if ($header['type'] === self::MESSAGE_CLOSE) {
$this->getSocket()->close();
yield false;
yield null;
return;
}
// Read the serialized data from the socket.
if ($header['type'] === self::MESSAGE_DATA) {
$buffer = '';
while (strlen($buffer) < $header['length']) {
$buffer .= (yield $this->socket->read($header['length']));
}
$length = $header['length'];
do {
$buffer .= (yield $this->getSocket()->read($length));
} while (($length -= strlen($buffer)) > 0);
// Attempt to unserialize the received data.
try {
@ -122,9 +127,7 @@ class Channel
// Create a message with just a DONE header and zero data.
$message = pack('Cx4', self::MESSAGE_CLOSE);
return $this->getSocket()->write($message)->then(function () {
$this->getSocket()->close();
});
return $this->getSocket()->end($message);
}
/**