1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Use parser package; refactor ChannelledSocket

This commit is contained in:
Aaron Piotrowski 2017-06-07 23:33:13 -05:00
parent 5a36a60651
commit f9be01adb5
4 changed files with 105 additions and 351 deletions

View File

@ -21,8 +21,9 @@
}
],
"require": {
"amphp/amp": "^2",
"amphp/amp": "^2.0",
"amphp/byte-stream": "dev-master as 0.1",
"amphp/parser": "^1.0",
"amphp/process": "dev-amp_v2 as 0.2"
},
"require-dev": {

View File

@ -0,0 +1,83 @@
<?php
namespace Amp\Parallel\Sync;
use Amp\CallableMaker;
use Amp\Parser\Parser;
class ChannelParser extends Parser {
use CallableMaker;
const HEADER_LENGTH = 5;
/**
* @param callable(mixed $data) Callback invoked when data is parsed.
*/
public function __construct(callable $callback) {
parent::__construct(self::parser($callback, self::callableFromStaticMethod("errorHandler")));
}
/**
* @param mixed $data Data to encode to send over a channel.
*
* @return string Encoded data that can be parsed by this class.
*
* @throws \Amp\Parallel\Sync\SerializationException
*/
public function encode($data): string {
try {
$data = \serialize($data);
} catch (\Throwable $exception) {
throw new SerializationException(
"The given data cannot be sent because it is not serializable.", $exception
);
}
return \pack("CL", 0, \strlen($data)) . $data;
}
/**
* @param \SplQueue $queue
* @param callable $errorHandler
*
* @return \Generator
*
* @throws \Amp\Parallel\Sync\ChannelException
* @throws \Amp\Parallel\Sync\SerializationException
*/
private static function parser(callable $callback, callable $errorHandler): \Generator {
while (true) {
$data = \unpack("Cprefix/Llength", yield self::HEADER_LENGTH);
if ($data["prefix"] !== 0) {
throw new ChannelException("Invalid header received");
}
$data = yield $data["length"];
\set_error_handler($errorHandler);
// Attempt to unserialize the received data.
try {
$data = \unserialize($data);
} catch (\Throwable $exception) {
throw new SerializationException("Exception thrown when unserializing data", $exception);
} finally {
\restore_error_handler();
}
$callback($data);
}
}
private static function errorHandler($errno, $errstr, $errfile, $errline) {
if ($errno & \error_reporting()) {
throw new ChannelException(\sprintf(
'Received corrupted data. Errno: %d; %s in file %s on line %d',
$errno,
$errstr,
$errfile,
$errline
));
}
}
}

View File

@ -1,308 +1,34 @@
<?php
namespace Amp\Parallel\Sync;
use Amp\Deferred;
use Amp\Failure;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
class ChannelledSocket implements Channel {
const HEADER_LENGTH = 5;
class ChannelledSocket extends ChannelledStream {
/** @var \Amp\ByteStream\ResourceInputStream */
private $read;
/** @var resource Stream resource. */
private $readResource;
/** @var resource Stream resource. */
private $writeResource;
/** @var string onReadable loop watcher. */
private $readWatcher;
/** @var string onWritable loop watcher. */
private $writeWatcher;
/** @var \SplQueue Queue of pending reads. */
private $reads;
/** @var \SplQueue Queue of pending writes. */
private $writes;
/** @var bool */
private $open = true;
/** @var bool */
private $autoClose = true;
/** @var \Amp\ByteStream\ResourceOutputStream */
private $write;
/**
* @param resource $read Readable stream resource.
* @param resource $write Writable stream resource.
* @param bool $autoClose True to close the stream resources when this object is destroyed, false to leave open.
*
* @throws \Error If a stream resource is not given for $resource.
*/
public function __construct($read, $write, bool $autoClose = true) {
if (!\is_resource($read) || \get_resource_type($read) !== 'stream') {
throw new \Error('Invalid resource given to constructor!');
}
if (!\is_resource($write) || \get_resource_type($write) !== 'stream') {
throw new \Error('Invalid resource given to constructor!');
}
$this->readResource = $read;
$this->writeResource = $write;
$this->autoClose = $autoClose;
\stream_set_blocking($this->readResource, false);
\stream_set_read_buffer($this->readResource, 0);
\stream_set_write_buffer($this->readResource, 0);
if ($this->readResource !== $this->writeResource) {
\stream_set_blocking($this->writeResource, false);
\stream_set_read_buffer($this->writeResource, 0);
\stream_set_write_buffer($this->writeResource, 0);
}
$this->reads = $reads = new \SplQueue;
$this->writes = $writes = new \SplQueue;
$errorHandler = static function ($errno, $errstr, $errfile, $errline) {
if ($errno & \error_reporting()) {
throw new ChannelException(\sprintf(
'Received corrupted data. Errno: %d; %s in file %s on line %d',
$errno,
$errstr,
$errfile,
$errline
));
}
};
$this->readWatcher = Loop::onReadable($this->readResource, static function ($watcher, $stream) use ($reads, $errorHandler) {
while (!$reads->isEmpty()) {
/** @var \Amp\Deferred $deferred */
list($buffer, $length, $deferred) = $reads->shift();
if ($length === 0) {
// Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes.
$data = @\fread($stream, self::HEADER_LENGTH - \strlen($buffer));
if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) {
$deferred->fail(new ChannelException("The socket unexpectedly closed"));
break;
}
$buffer .= $data;
if (\strlen($buffer) !== self::HEADER_LENGTH) {
// Not enough data available.
$reads->unshift([$buffer, 0, $deferred]);
return;
}
$data = \unpack("Cprefix/Llength", $data);
if ($data["prefix"] !== 0) {
$deferred->fail(new ChannelException("Invalid header received"));
break;
}
$length = $data["length"];
$buffer = '';
}
// Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes.
$data = @\fread($stream, $length - \strlen($buffer));
if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) {
$deferred->fail(new ChannelException("The socket unexpectedly closed"));
break;
}
$buffer .= $data;
if (\strlen($buffer) < $length) {
// Not enough data available.
$reads->unshift([$buffer, $length, $deferred]);
return;
}
\set_error_handler($errorHandler);
try {
// Attempt to unserialize the received data.
try {
$data = \unserialize($buffer);
} finally {
\restore_error_handler();
}
$deferred->resolve($data);
} catch (\Throwable $exception) {
$deferred->fail(new SerializationException("Exception thrown when unserializing data", $exception));
}
}
Loop::disable($watcher);
});
$this->writeWatcher = Loop::onWritable($this->writeResource, static function ($watcher, $stream) use ($writes) {
try {
while (!$writes->isEmpty()) {
/** @var \Amp\Deferred $deferred */
list($data, $previous, $deferred) = $writes->shift();
$length = \strlen($data);
if ($length === 0) {
$deferred->resolve(0);
continue;
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
$written = @\fwrite($stream, $data);
if ($written === false || $written === 0) {
$message = "Failed to write to socket";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
$exception = new ChannelException($message);
$deferred->fail($exception);
while (!$writes->isEmpty()) {
list(, , $deferred) = $writes->shift();
$deferred->fail($exception);
}
return;
}
if ($length <= $written) {
$deferred->resolve($written + $previous);
continue;
}
$data = \substr($data, $written);
$writes->unshift([$data, $written + $previous, $deferred]);
return;
}
} finally {
if ($writes->isEmpty()) {
Loop::disable($watcher);
}
}
});
Loop::disable($this->readWatcher);
Loop::disable($this->writeWatcher);
}
public function __destruct() {
if ($this->readResource !== null) {
$this->close();
}
public function __construct($read, $write) {
parent::__construct(
$this->read = new ResourceInputStream($read),
$this->write = new ResourceOutputStream($write)
);
}
/**
* {@inheritdoc}
* Closes the read and write resource streams.
*/
public function close() {
if (\is_resource($this->readResource)) {
if ($this->autoClose) {
@\fclose($this->readResource);
if ($this->readResource !== $this->writeResource) {
@\fclose($this->writeResource);
}
}
$this->readResource = null;
$this->writeResource = null;
}
$this->open = false;
if (!$this->reads->isEmpty()) {
$exception = new ChannelException("The connection was unexpectedly closed before reading completed");
do {
/** @var \Amp\Deferred $deferred */
list(, , $deferred) = $this->reads->shift();
$deferred->fail($exception);
} while (!$this->reads->isEmpty());
}
if (!$this->writes->isEmpty()) {
$exception = new ChannelException("The connection was unexpectedly writing completed");
do {
/** @var \Amp\Deferred $deferred */
list(, , $deferred) = $this->writes->shift();
$deferred->fail($exception);
} while (!$this->writes->isEmpty());
}
Loop::cancel($this->readWatcher);
Loop::cancel($this->writeWatcher);
$this->read->close();
$this->write->close();
}
/**
* {@inheritdoc}
*/
public function receive(): Promise {
if (!$this->open) {
return new Failure(new ChannelException("The channel is has been closed"));
}
$deferred = new Deferred;
$this->reads->push(["", 0, $deferred]);
Loop::enable($this->readWatcher);
return $deferred->promise();
}
/**
* @param string $data
* @param bool $end
*
* @return \Amp\Promise
*/
public function send($data): Promise {
if (!$this->open) {
return new Failure(new ChannelException("The channel is has been closed"));
}
// Serialize the data to send into the channel.
try {
$data = \serialize($data);
} catch (\Throwable $exception) {
throw new SerializationException(
"The given data cannot be sent because it is not serializable.", $exception
);
}
$data = \pack("CL", 0, \strlen($data)) . $data;
$length = \strlen($data);
$written = 0;
if ($this->writes->isEmpty()) {
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
$written = @\fwrite($this->writeResource, $data);
if ($written === false) {
$message = "Failed to write to stream";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
return new Failure(new ChannelException($message));
}
if ($length <= $written) {
return new Success($written);
}
$data = \substr($data, $written);
}
$deferred = new Deferred;
$this->writes->push([$data, $written, $deferred]);
Loop::enable($this->writeWatcher);
return $deferred->promise();
}
}
}

View File

@ -4,7 +4,6 @@ namespace Amp\Parallel\Sync;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\Parser;
use Amp\ByteStream\StreamException;
use Amp\Coroutine;
use Amp\Promise;
@ -15,8 +14,6 @@ use Amp\Promise;
* Supports full duplex read and write.
*/
class ChannelledStream implements Channel {
const HEADER_LENGTH = 5;
/** @var \Amp\ByteStream\InputStream */
private $read;
@ -26,7 +23,7 @@ class ChannelledStream implements Channel {
/** @var \SplQueue */
private $received;
/** @var \Amp\ByteStream\Parser */
/** @var \Amp\Parser\Parser */
private $parser;
/**
@ -39,51 +36,7 @@ class ChannelledStream implements Channel {
$this->read = $read;
$this->write = $write;
$this->received = new \SplQueue;
$this->parser = new Parser(self::parser($this->received, static function ($errno, $errstr, $errfile, $errline) {
if ($errno & \error_reporting()) {
throw new ChannelException(\sprintf(
'Received corrupted data. Errno: %d; %s in file %s on line %d',
$errno,
$errstr,
$errfile,
$errline
));
}
}));
}
/**
* @param \SplQueue $queue
* @param callable $errorHandler
*
* @return \Generator
*
* @throws \Amp\Parallel\Sync\ChannelException
* @throws \Amp\Parallel\Sync\SerializationException
*/
private static function parser(\SplQueue $queue, callable $errorHandler): \Generator {
while (true) {
$data = \unpack("Cprefix/Llength", yield self::HEADER_LENGTH);
if ($data["prefix"] !== 0) {
throw new ChannelException("Invalid header received");
}
$data = yield $data["length"];
\set_error_handler($errorHandler);
// Attempt to unserialize the received data.
try {
$data = \unserialize($data);
} catch (\Throwable $exception) {
throw new SerializationException("Exception thrown when unserializing data", $exception);
} finally {
\restore_error_handler();
}
$queue->push($data);
}
$this->parser = new ChannelParser([$this->received, 'push']);
}
/**
@ -94,17 +47,8 @@ class ChannelledStream implements Channel {
}
private function doSend($data): \Generator {
// Serialize the data to send into the channel.
try {
$data = \serialize($data);
} catch (\Throwable $exception) {
throw new SerializationException(
"The given data cannot be sent because it is not serializable.", $exception
);
}
try {
return yield $this->write->write(\pack("CL", 0, \strlen($data)) . $data);
return yield $this->write->write($this->parser->encode($data));
} catch (StreamException $exception) {
throw new ChannelException("Sending on the channel failed. Did the context die?", $exception);
}
@ -124,7 +68,7 @@ class ChannelledStream implements Channel {
}
try {
yield $this->parser->write($chunk);
$this->parser->push($chunk);
} catch (StreamException $exception) {
throw new ChannelException("Reading from the channel failed. Did the context die?", $exception);
}