1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-27 04:44:56 +01:00

Add better socket checking and error handling

Also update send() to return a generator.
This commit is contained in:
coderstephen 2015-08-03 01:22:17 -05:00
parent f121adb577
commit 7919a0df38
2 changed files with 34 additions and 11 deletions

View File

@ -8,6 +8,10 @@ use Icicle\Socket\Stream\DuplexStream;
* An asynchronous channel for sending data between threads and processes. * An asynchronous channel for sending data between threads and processes.
* *
* Supports full duplex read and write. * Supports full duplex read and write.
*
* Note that the sockets are lazily bound to enable temporary thread safety. A
* channel object can be safely transferred between threads up until the point
* that the channel is used.
*/ */
class Channel class Channel
{ {
@ -51,28 +55,38 @@ class Channel
* *
* @param mixed $data The data to send. * @param mixed $data The data to send.
* *
* @return PromiseInterface * @return Generator
*/ */
public function send($data) public function send($data)
{ {
$serialized = serialize($data); // Serialize the data to send into the channel.
try {
$serialized = serialize($data);
} catch (\Exception $exception) {
throw new ChannelException('The given data is not sendable because it is not serializable.',
0, $exception);
}
$length = strlen($serialized); $length = strlen($serialized);
$header = pack('CL', self::MESSAGE_DATA, $length); $header = pack('CL', self::MESSAGE_DATA, $length);
$message = $header.$serialized; $message = $header.$serialized;
return $this->getSocket()->write($message); yield $this->getSocket()->write($message);
} }
/** /**
* Waits asynchronously for a message from the peer. * Waits asynchronously for a message from the peer.
* *
* @return PromiseInterface * @return Generator
*/ */
public function receive() public function receive()
{ {
// Read the message header first and extract its contents. // Read the message header first and extract its contents.
$header = unpack('Ctype/Llength', (yield $this->getSocket()->read(5))); $buffer = '';
while (strlen($buffer) < 5) {
$buffer .= (yield $this->getSocket()->read(5));
}
$header = unpack('Ctype/Llength', $buffer);
// If the message type is MESSAGE_DONE, the peer was closed and the channel // If the message type is MESSAGE_DONE, the peer was closed and the channel
// is done. // is done.
@ -83,8 +97,17 @@ class Channel
// Read the serialized data from the socket. // Read the serialized data from the socket.
if ($header['type'] === self::MESSAGE_DATA) { if ($header['type'] === self::MESSAGE_DATA) {
$serialized = (yield $this->socket->read($header['length'])); $buffer = '';
$data = unserialize($serialized); while (strlen($buffer) < $header['length']) {
$buffer .= (yield $this->socket->read($header['length']));
}
// Attempt to unserialize the received data.
try {
$data = unserialize($buffer);
} catch (\Exception $exception) {
throw new ChannelException('Received corrupt data from peer.', 0, $exception);
}
yield $data; yield $data;
return; return;

View File

@ -31,11 +31,11 @@ class ChannelTest extends \PHPUnit_Framework_TestCase
public function testSendReceive() public function testSendReceive()
{ {
list($a, $b) = Channel::create(); Coroutine\create(function () {
list($a, $b) = Channel::create();
$a->send('hello')->then(function () use ($b) { yield $a->send('hello');
return new Coroutine\Coroutine($b->receive()); $data = (yield $b->receive());
})->done(function ($data) {
$this->assertEquals('hello', $data); $this->assertEquals('hello', $data);
}); });