1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Merge branch 'master' of github.com:amphp/parallel into fifo

This commit is contained in:
Daniil Gentili 2020-02-11 14:45:32 +01:00
commit 59dfc74967
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
11 changed files with 235 additions and 20 deletions

View File

@ -4,14 +4,13 @@ php:
- 7.1
- 7.2
- 7.3
- 7.4snapshot
- 7.4
- nightly
sudo: false
matrix:
allow_failures:
- php: 7.4snapshot
- php: nightly
fast_finish: true
@ -19,15 +18,21 @@ env:
- AMP_DEBUG=true
before_install:
- phpenv config-rm xdebug.ini || echo "No xdebug config."
# xdebug causes hangs on PHP 7.1 and 7.2
- if [ "$TRAVIS_PHP_VERSION" == "7.1" ] || [ "$TRAVIS_PHP_VERSION" == "7.2" ]; then
phpenv config-rm xdebug.ini || echo "No xdebug config.";
fi
install:
- composer update -n --prefer-dist
# pthreads is now only supported on PHP 7.2+
- if [ "$TRAVIS_PHP_VERSION" != "7.1" ]; then
# ext-pthreads is only supported on PHP 7.2 and 7.3
- if [ "$TRAVIS_PHP_VERSION" == "7.2" ] || [ "$TRAVIS_PHP_VERSION" == "7.3" ]; then
travis/install-pthreads.sh;
fi
- travis/install-parallel.sh;
# ext-parallel is only supported on PHP 7.2+
- if [ "$TRAVIS_PHP_VERSION" != "7.1" ]; then
travis/install-parallel.sh;
fi
- wget https://github.com/php-coveralls/php-coveralls/releases/download/v1.0.2/coveralls.phar
- chmod +x coveralls.phar

View File

@ -38,6 +38,8 @@
"Amp\\Parallel\\": "lib"
},
"files": [
"lib/Context/functions.php",
"lib/Sync/functions.php",
"lib/Worker/functions.php"
]
},

View File

@ -3,6 +3,7 @@
namespace Amp\Parallel\Context;
use Amp\Loop;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitFailure;
use Amp\Parallel\Sync\ExitResult;
@ -10,6 +11,7 @@ use Amp\Parallel\Sync\ExitSuccess;
use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\Sync\SynchronizationError;
use Amp\Promise;
use Amp\TimeoutException;
use parallel\Runtime;
use function Amp\call;
@ -341,16 +343,20 @@ final class Parallel implements Context
public function receive(): Promise
{
if ($this->channel === null) {
throw new StatusError('The process has not been started.');
throw new StatusError('The thread has not been started.');
}
return call(function (): \Generator {
try {
$data = yield $this->channel->receive();
} catch (ChannelException $e) {
throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
if ($data instanceof ExitResult) {
$data = $data->getResult();
throw new SynchronizationError(\sprintf(
'Thread process unexpectedly exited with result of type: %s',
'Thread unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
));
}
@ -372,7 +378,27 @@ final class Parallel implements Context
throw new \Error('Cannot send exit result objects.');
}
return $this->channel->send($data);
return call(function () use ($data): \Generator {
try {
return yield $this->channel->send($data);
} catch (ChannelException $e) {
if ($this->channel === null) {
throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
try {
$data = yield Promise\timeout($this->join(), 100);
} catch (ContextException | ChannelException | TimeoutException $ex) {
$this->kill();
throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
throw new SynchronizationError(\sprintf(
'Thread unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
), 0, $e);
}
});
}
/**

View File

@ -10,6 +10,7 @@ use Amp\Process\Process as BaseProcess;
use Amp\Process\ProcessInputStream;
use Amp\Process\ProcessOutputStream;
use Amp\Promise;
use Amp\TimeoutException;
use function Amp\call;
final class Process implements Context
@ -222,7 +223,7 @@ final class Process implements Context
try {
$data = yield $this->channel->receive();
} catch (ChannelException $e) {
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
if ($data instanceof ExitResult) {
@ -250,7 +251,29 @@ final class Process implements Context
throw new \Error("Cannot send exit result objects");
}
return $this->channel->send($data);
return call(function () use ($data): \Generator {
try {
return yield $this->channel->send($data);
} catch (ChannelException $e) {
if ($this->channel === null) {
throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
try {
$data = yield Promise\timeout($this->join(), 100);
} catch (ContextException | ChannelException | TimeoutException $ex) {
if ($this->isRunning()) {
$this->kill();
}
throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
throw new SynchronizationError(\sprintf(
'Process unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
), 0, $e);
}
});
}
/**

33
lib/Context/functions.php Normal file
View File

@ -0,0 +1,33 @@
<?php
namespace Amp\Parallel\Context;
use Amp\Promise;
/**
* @param string|string[] $script
*
* @return Context
*/
function create($script): Context
{
if (Parallel::isSupported()) {
return new Parallel($script);
}
return new Process($script);
}
/**
* @param string|string[] $script
*
* @return Promise<Context>
*/
function run($script): Promise
{
if (Parallel::isSupported()) {
return Parallel::run($script);
}
return Process::run($script);
}

View File

@ -13,7 +13,7 @@ final class ExitFailure implements ExitResult
/** @var int|string */
private $code;
/** @var array */
/** @var string[] */
private $trace;
/** @var self|null */
@ -24,7 +24,7 @@ final class ExitFailure implements ExitResult
$this->type = \get_class($exception);
$this->message = $exception->getMessage();
$this->code = $exception->getCode();
$this->trace = $exception->getTraceAsString();
$this->trace = flattenThrowableBacktrace($exception);
if ($previous = $exception->getPrevious()) {
$this->previous = new self($previous);
@ -53,7 +53,7 @@ final class ExitFailure implements ExitResult
$this->code,
PanicError::class
),
$this->trace,
\implode("\n", $this->trace),
$previous
);
}

75
lib/Sync/functions.php Normal file
View File

@ -0,0 +1,75 @@
<?php
namespace Amp\Parallel\Sync;
/**
* @param \Throwable $exception
*
* @return string[] Serializable array of strings representing the exception backtrace including function arguments.
*/
function flattenThrowableBacktrace(\Throwable $exception): array
{
$output = [];
$counter = 0;
$trace = $exception->getTrace();
foreach ($trace as $call) {
if (isset($call['class'])) {
$name = $call['class'] . $call['type'] . $call['function'];
} else {
$name = $call['function'];
}
$args = \implode(', ', \array_map(__NAMESPACE__ . '\\flattenArgument', $call['args']));
$output[] = \sprintf(
'#%d %s(%d): %s(%s)',
$counter++,
$call['file'] ?? '[internal function]',
$call['line'] ?? 0,
$name,
$args
);
}
return $output;
}
/**
* @param mixed $value
*
* @return string Serializable string representation of $value for backtraces.
*/
function flattenArgument($value): string
{
if ($value instanceof \Closure) {
$closureReflection = new \ReflectionFunction($value);
return \sprintf(
'Closure(%s:%s)',
$closureReflection->getFileName(),
$closureReflection->getStartLine()
);
}
if (\is_object($value)) {
return \sprintf('Object(%s)', \get_class($value));
}
if (\is_array($value)) {
return 'Array([' . \implode(', ', \array_map(__FUNCTION__, $value)) . '])';
}
if (\is_resource($value)) {
return \sprintf('Resource(%s)', \get_resource_type($value));
}
if (\is_string($value)) {
return '"' . $value . '"';
}
if (\is_null($value)) {
return 'null';
}
return (string) $value;
}

View File

@ -3,6 +3,7 @@
namespace Amp\Parallel\Worker\Internal;
use Amp\Failure;
use Amp\Parallel\Sync;
use Amp\Parallel\Worker\TaskError;
use Amp\Parallel\Worker\TaskException;
use Amp\Promise;
@ -25,7 +26,7 @@ final class TaskFailure extends TaskResult
/** @var int|string */
private $code;
/** @var array */
/** @var string[] */
private $trace;
/** @var self|null */
@ -38,7 +39,7 @@ final class TaskFailure extends TaskResult
$this->parent = $exception instanceof \Error ? self::PARENT_ERROR : self::PARENT_EXCEPTION;
$this->message = $exception->getMessage();
$this->code = $exception->getCode();
$this->trace = $exception->getTraceAsString();
$this->trace = Sync\flattenThrowableBacktrace($exception);
if ($previous = $exception->getPrevious()) {
$this->previous = new self($id, $previous);
@ -61,7 +62,7 @@ final class TaskFailure extends TaskResult
return new TaskError(
$this->type,
\sprintf($format, $this->type, $this->message, $this->code, TaskError::class),
$this->trace,
\implode("\n", $this->trace),
$previous
);
}
@ -69,7 +70,7 @@ final class TaskFailure extends TaskResult
return new TaskException(
$this->type,
\sprintf($format, $this->type, $this->message, $this->code, TaskException::class),
$this->trace,
\implode("\n", $this->trace),
$previous
);
}

View File

@ -32,6 +32,27 @@ abstract class AbstractContextTest extends AsyncTestCase
yield $context->join();
}
public function testThrowingProcessOnReceive()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('Test message');
$context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php");
yield $context->start();
yield $context->receive();
}
public function testThrowingProcessOnSend()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('Test message');
$context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php");
yield $context->start();
yield new Delayed(100);
yield $context->send(1);
}
public function testInvalidScriptPath()
{
$this->expectException(PanicError::class);
@ -120,4 +141,25 @@ abstract class AbstractContextTest extends AsyncTestCase
yield $context->start();
yield $context->join();
}
public function testExitingProcessOnReceive()
{
$this->expectException(ContextException::class);
$this->expectExceptionMessage('stopped responding');
$context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php");
yield $context->start();
yield $context->receive();
}
public function testExitingProcessOnSend()
{
$this->expectException(ContextException::class);
$this->expectExceptionMessage('stopped responding');
$context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php");
yield $context->start();
yield new Delayed(500);
yield $context->send(1);
}
}

View File

@ -0,0 +1,8 @@
<?php
use Amp\Parallel\Sync\Channel;
use Amp\PHPUnit\TestException;
return function (Channel $channel) use ($argv) {
throw new TestException('Test message');
};

View File

@ -154,7 +154,7 @@ abstract class AbstractWorkerTest extends AsyncTestCase
public function testKill()
{
$this->setTimeout(250);
$this->setTimeout(500);
$worker = $this->createWorker();