diff --git a/src/Sync/Channel.php b/src/Sync/Channel.php index bdeb68b..a5099b9 100644 --- a/src/Sync/Channel.php +++ b/src/Sync/Channel.php @@ -18,6 +18,8 @@ class Channel const MESSAGE_CLOSE = 1; const MESSAGE_DATA = 2; + const HEADER_LENGTH = 5; + /** * @var DuplexStream An asynchronous socket stream. */ @@ -80,25 +82,28 @@ class Channel { // Read the message header first and extract its contents. $buffer = ''; - while (strlen($buffer) < 5) { - $buffer .= (yield $this->getSocket()->read(5)); - } + $length = self::HEADER_LENGTH; + do { + $buffer .= (yield $this->getSocket()->read($length)); + } while (($length -= strlen($buffer)) > 0); + $header = unpack('Ctype/Llength', $buffer); // If the message type is MESSAGE_CLOSE, the peer was closed and the channel // is done. if ($header['type'] === self::MESSAGE_CLOSE) { $this->getSocket()->close(); - yield false; + yield null; return; } // Read the serialized data from the socket. if ($header['type'] === self::MESSAGE_DATA) { $buffer = ''; - while (strlen($buffer) < $header['length']) { - $buffer .= (yield $this->socket->read($header['length'])); - } + $length = $header['length']; + do { + $buffer .= (yield $this->getSocket()->read($length)); + } while (($length -= strlen($buffer)) > 0); // Attempt to unserialize the received data. try { @@ -122,9 +127,7 @@ class Channel // Create a message with just a DONE header and zero data. $message = pack('Cx4', self::MESSAGE_CLOSE); - return $this->getSocket()->write($message)->then(function () { - $this->getSocket()->close(); - }); + return $this->getSocket()->end($message); } /**