From 7919a0df38862fdd35910441fac56f59c19236f5 Mon Sep 17 00:00:00 2001 From: coderstephen Date: Mon, 3 Aug 2015 01:22:17 -0500 Subject: [PATCH] Add better socket checking and error handling Also update send() to return a generator. --- src/Sync/Channel.php | 37 ++++++++++++++++++++++++++++++------- tests/Sync/ChannelTest.php | 8 ++++---- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/Sync/Channel.php b/src/Sync/Channel.php index 78aa77a..0cfd8ae 100644 --- a/src/Sync/Channel.php +++ b/src/Sync/Channel.php @@ -8,6 +8,10 @@ use Icicle\Socket\Stream\DuplexStream; * An asynchronous channel for sending data between threads and processes. * * Supports full duplex read and write. + * + * Note that the sockets are lazily bound to enable temporary thread safety. A + * channel object can be safely transferred between threads up until the point + * that the channel is used. */ class Channel { @@ -51,28 +55,38 @@ class Channel * * @param mixed $data The data to send. * - * @return PromiseInterface + * @return Generator */ public function send($data) { - $serialized = serialize($data); + // Serialize the data to send into the channel. + try { + $serialized = serialize($data); + } catch (\Exception $exception) { + throw new ChannelException('The given data is not sendable because it is not serializable.', + 0, $exception); + } $length = strlen($serialized); $header = pack('CL', self::MESSAGE_DATA, $length); $message = $header.$serialized; - return $this->getSocket()->write($message); + yield $this->getSocket()->write($message); } /** * Waits asynchronously for a message from the peer. * - * @return PromiseInterface + * @return Generator */ public function receive() { // Read the message header first and extract its contents. - $header = unpack('Ctype/Llength', (yield $this->getSocket()->read(5))); + $buffer = ''; + while (strlen($buffer) < 5) { + $buffer .= (yield $this->getSocket()->read(5)); + } + $header = unpack('Ctype/Llength', $buffer); // If the message type is MESSAGE_DONE, the peer was closed and the channel // is done. @@ -83,8 +97,17 @@ class Channel // Read the serialized data from the socket. if ($header['type'] === self::MESSAGE_DATA) { - $serialized = (yield $this->socket->read($header['length'])); - $data = unserialize($serialized); + $buffer = ''; + while (strlen($buffer) < $header['length']) { + $buffer .= (yield $this->socket->read($header['length'])); + } + + // Attempt to unserialize the received data. + try { + $data = unserialize($buffer); + } catch (\Exception $exception) { + throw new ChannelException('Received corrupt data from peer.', 0, $exception); + } yield $data; return; diff --git a/tests/Sync/ChannelTest.php b/tests/Sync/ChannelTest.php index c6896b0..f68d956 100644 --- a/tests/Sync/ChannelTest.php +++ b/tests/Sync/ChannelTest.php @@ -31,11 +31,11 @@ class ChannelTest extends \PHPUnit_Framework_TestCase public function testSendReceive() { - list($a, $b) = Channel::create(); + Coroutine\create(function () { + list($a, $b) = Channel::create(); - $a->send('hello')->then(function () use ($b) { - return new Coroutine\Coroutine($b->receive()); - })->done(function ($data) { + yield $a->send('hello'); + $data = (yield $b->receive()); $this->assertEquals('hello', $data); });