mirror of
https://github.com/danog/parallel.git
synced 2025-01-22 14:01:14 +01:00
Reorganize exception classes and add TaskError
TaskError is thrown if the exception thrown in the worker was an instance of Error.
This commit is contained in:
parent
70bed8ec57
commit
c7294da60d
@ -2,19 +2,11 @@
|
||||
|
||||
namespace Amp\Parallel\Forking;
|
||||
|
||||
use function Amp\call;
|
||||
use Amp\{ Coroutine, Loop, Promise };
|
||||
use Amp\Parallel\{
|
||||
ContextException,
|
||||
ChannelException,
|
||||
Process,
|
||||
SerializationException,
|
||||
StatusError,
|
||||
Strand,
|
||||
SynchronizationError
|
||||
};
|
||||
use Amp\Parallel\Sync\{ Channel, ChannelledSocket };
|
||||
use Amp\Parallel\{ ContextException, Process, StatusError, Strand, SynchronizationError };
|
||||
use Amp\Parallel\Sync\{ Channel, ChannelException, ChannelledSocket, SerializationException };
|
||||
use Amp\Parallel\Sync\Internal\{ ExitFailure, ExitResult, ExitSuccess };
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* Implements a UNIX-compatible context using forked processes.
|
||||
|
@ -5,14 +5,13 @@ namespace Amp\Parallel\Process;
|
||||
use function Amp\call;
|
||||
use Amp\{ Coroutine, Promise };
|
||||
use Amp\Parallel\{
|
||||
ChannelException,
|
||||
ContextException,
|
||||
Process as ProcessContext,
|
||||
StatusError,
|
||||
Strand,
|
||||
SynchronizationError
|
||||
};
|
||||
use Amp\Parallel\Sync\{ ChannelledSocket, Internal\ExitResult };
|
||||
use Amp\Parallel\Sync\{ ChannelException, ChannelledSocket, Internal\ExitResult };
|
||||
use Amp\Process\Process;
|
||||
|
||||
class ChannelledProcess implements ProcessContext, Strand {
|
||||
|
@ -1,6 +1,6 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel;
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
class ChannelException extends \Exception {
|
||||
public function __construct(string $message, \Throwable $previous = null) {
|
@ -3,7 +3,6 @@
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\{ Deferred, Failure, Loop, Promise, Success };
|
||||
use Amp\Parallel\{ ChannelException, SerializationException };
|
||||
|
||||
class ChannelledSocket implements Channel {
|
||||
const HEADER_LENGTH = 5;
|
||||
|
@ -3,10 +3,7 @@
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\{ Coroutine, Promise };
|
||||
use Amp\ByteStream\{
|
||||
InputStream, OutputStream, Parser, ReadableStream, StreamException, WritableStream
|
||||
};
|
||||
use Amp\Parallel\{ ChannelException, SerializationException };
|
||||
use Amp\ByteStream\{ InputStream, OutputStream, Parser, StreamException };
|
||||
|
||||
/**
|
||||
* An asynchronous channel for sending data between threads and processes.
|
||||
@ -57,8 +54,8 @@ class ChannelledStream implements Channel {
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @throws \Amp\Parallel\ChannelException
|
||||
* @throws \Amp\Parallel\SerializationException
|
||||
* @throws \Amp\Parallel\Sync\ChannelException
|
||||
* @throws \Amp\Parallel\Sync\SerializationException
|
||||
*/
|
||||
private static function parser(\SplQueue $queue, callable $errorHandler): \Generator {
|
||||
while (true) {
|
||||
|
@ -2,8 +2,6 @@
|
||||
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\Parallel\LockAlreadyReleasedError;
|
||||
|
||||
/**
|
||||
* A handle on an acquired lock from a synchronization object.
|
||||
*
|
||||
|
@ -1,5 +1,5 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel;
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
class LockAlreadyReleasedError extends \Error {}
|
@ -1,5 +1,5 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel;
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
class MutexException extends \Exception {}
|
@ -4,7 +4,6 @@ namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\Coroutine;
|
||||
use Amp\Delayed;
|
||||
use Amp\Parallel\SemaphoreException;
|
||||
use Amp\Promise;
|
||||
|
||||
/**
|
||||
|
@ -1,5 +1,5 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel;
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
class SemaphoreException extends \Exception {}
|
@ -1,5 +1,5 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel;
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
class SerializationException extends ChannelException {}
|
@ -1,5 +1,5 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel;
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
class SharedMemoryException extends \Exception {}
|
@ -3,7 +3,6 @@
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\{ Coroutine, Promise };
|
||||
use Amp\Parallel\SharedMemoryException;
|
||||
|
||||
/**
|
||||
* A container object for sharing a value across contexts.
|
||||
|
@ -3,8 +3,14 @@
|
||||
namespace Amp\Parallel\Threading\Internal;
|
||||
|
||||
use Amp\{ Coroutine, Loop, Promise };
|
||||
use Amp\Parallel\{ ChannelException, SerializationException };
|
||||
use Amp\Parallel\Sync\{ Channel, ChannelledSocket, Internal\ExitFailure, Internal\ExitSuccess };
|
||||
use Amp\Parallel\Sync\{
|
||||
Channel,
|
||||
ChannelException,
|
||||
ChannelledSocket,
|
||||
Internal\ExitFailure,
|
||||
Internal\ExitSuccess,
|
||||
SerializationException
|
||||
};
|
||||
|
||||
/**
|
||||
* An internal thread that executes a given function concurrently.
|
||||
|
@ -4,8 +4,8 @@ namespace Amp\Parallel\Threading;
|
||||
|
||||
use function Amp\call;
|
||||
use Amp\{ Coroutine, Loop, Promise };
|
||||
use Amp\Parallel\{ ChannelException, ContextException, StatusError, SynchronizationError, Strand };
|
||||
use Amp\Parallel\Sync\{ ChannelledSocket, Internal\ExitResult };
|
||||
use Amp\Parallel\{ ContextException, StatusError, SynchronizationError, Strand };
|
||||
use Amp\Parallel\Sync\{ ChannelException, ChannelledSocket, Internal\ExitResult };
|
||||
|
||||
/**
|
||||
* Implements an execution context using native multi-threading.
|
||||
|
@ -3,7 +3,7 @@
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
use Amp\{ Coroutine, Deferred, Promise };
|
||||
use Amp\Parallel\{ StatusError, Strand, WorkerException} ;
|
||||
use Amp\Parallel\{ StatusError, Strand } ;
|
||||
use Amp\Parallel\Worker\Internal\{ Job, TaskResult };
|
||||
|
||||
/**
|
||||
@ -100,8 +100,9 @@ abstract class AbstractWorker implements Worker {
|
||||
*
|
||||
* @return \Generator
|
||||
* @throws \Amp\Parallel\StatusError
|
||||
* @throws \Amp\Parallel\TaskException
|
||||
* @throws \Amp\Parallel\WorkerException
|
||||
* @throws \Amp\Parallel\Worker\TaskException
|
||||
* @throws \Amp\Parallel\Worker\TaskError
|
||||
* @throws \Amp\Parallel\Worker\WorkerException
|
||||
*/
|
||||
private function doEnqueue(Task $task): \Generator {
|
||||
if (empty($this->jobQueue)) {
|
||||
|
@ -3,13 +3,20 @@
|
||||
namespace Amp\Parallel\Worker\Internal;
|
||||
|
||||
use Amp\Failure;
|
||||
use Amp\Parallel\TaskException;
|
||||
use Amp\Parallel\Worker\TaskError;
|
||||
use Amp\Parallel\Worker\TaskException;
|
||||
use Amp\Promise;
|
||||
|
||||
class TaskFailure extends TaskResult {
|
||||
const PARENT_EXCEPTION = 0;
|
||||
const PARENT_ERROR = 1;
|
||||
|
||||
/** @var string */
|
||||
private $type;
|
||||
|
||||
/** @var int */
|
||||
private $parent;
|
||||
|
||||
/** @var string */
|
||||
private $message;
|
||||
|
||||
@ -22,17 +29,32 @@ class TaskFailure extends TaskResult {
|
||||
public function __construct(string $id, \Throwable $exception) {
|
||||
parent::__construct($id);
|
||||
$this->type = \get_class($exception);
|
||||
$this->parent = $exception instanceof \Error ? self::PARENT_ERROR : self::PARENT_EXCEPTION;
|
||||
$this->message = $exception->getMessage();
|
||||
$this->code = $exception->getCode();
|
||||
$this->trace = $exception->getTraceAsString();
|
||||
}
|
||||
|
||||
public function promise(): Promise {
|
||||
return new Failure(new TaskException(
|
||||
$this->type,
|
||||
sprintf('Uncaught exception in worker of type "%s" with message "%s"', $this->type, $this->message),
|
||||
$this->code,
|
||||
$this->trace
|
||||
));
|
||||
switch ($this->parent) {
|
||||
case self::PARENT_ERROR:
|
||||
$exception = new TaskError(
|
||||
$this->type,
|
||||
sprintf('Uncaught Error in worker of type "%s" with message "%s"', $this->type, $this->message),
|
||||
$this->code,
|
||||
$this->trace
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
$exception = new TaskException(
|
||||
$this->type,
|
||||
sprintf('Uncaught Exception in worker of type "%s" with message "%s"', $this->type, $this->message),
|
||||
$this->code,
|
||||
$this->trace
|
||||
);
|
||||
}
|
||||
|
||||
return new Failure($exception);
|
||||
}
|
||||
}
|
41
lib/Worker/TaskError.php
Normal file
41
lib/Worker/TaskError.php
Normal file
@ -0,0 +1,41 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
class TaskError extends \Error {
|
||||
/** @var string Class name of error thrown from task. */
|
||||
private $name;
|
||||
|
||||
/** @var string Stack trace of the error thrown from task. */
|
||||
private $trace;
|
||||
|
||||
/**
|
||||
* @param string $name The exception class name.
|
||||
* @param string $message The panic message.
|
||||
* @param int $code The panic code.
|
||||
* @param string $trace The panic stack trace.
|
||||
*/
|
||||
public function __construct(string $name, string $message = '', int $code = 0, string $trace = '') {
|
||||
parent::__construct($message, $code);
|
||||
$this->name = $name;
|
||||
$this->trace = $trace;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the class name of the error thrown from the task.
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getName(): string {
|
||||
return $this->name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the stack trace at the point the error was thrown in the task.
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getWorkerTrace(): string {
|
||||
return $this->trace;
|
||||
}
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel;
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
class TaskException extends \Exception {
|
||||
/** @var string Class name of exception thrown from task. */
|
||||
@ -10,8 +10,6 @@ class TaskException extends \Exception {
|
||||
private $trace;
|
||||
|
||||
/**
|
||||
* Creates a new panic error.
|
||||
*
|
||||
* @param string $name The exception class name.
|
||||
* @param string $message The panic message.
|
||||
* @param int $code The panic code.
|
@ -1,6 +1,6 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel;
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
class WorkerException extends \Exception {
|
||||
/**
|
@ -59,7 +59,7 @@ class ChannelledSocketTest extends TestCase {
|
||||
|
||||
/**
|
||||
* @depends testSendReceive
|
||||
* @expectedException \Amp\Parallel\ChannelException
|
||||
* @expectedException \Amp\Parallel\Sync\ChannelException
|
||||
*/
|
||||
public function testInvalidDataReceived() {
|
||||
Loop::run(function () {
|
||||
@ -75,7 +75,7 @@ class ChannelledSocketTest extends TestCase {
|
||||
|
||||
/**
|
||||
* @depends testSendReceive
|
||||
* @expectedException \Amp\Parallel\ChannelException
|
||||
* @expectedException \Amp\Parallel\Sync\ChannelException
|
||||
*/
|
||||
public function testSendUnserializableData() {
|
||||
Loop::run(function () {
|
||||
@ -92,7 +92,7 @@ class ChannelledSocketTest extends TestCase {
|
||||
|
||||
/**
|
||||
* @depends testSendReceive
|
||||
* @expectedException \Amp\Parallel\ChannelException
|
||||
* @expectedException \Amp\Parallel\Sync\ChannelException
|
||||
*/
|
||||
public function testSendAfterClose() {
|
||||
Loop::run(function () {
|
||||
@ -107,7 +107,7 @@ class ChannelledSocketTest extends TestCase {
|
||||
|
||||
/**
|
||||
* @depends testSendReceive
|
||||
* @expectedException \Amp\Parallel\ChannelException
|
||||
* @expectedException \Amp\Parallel\Sync\ChannelException
|
||||
*/
|
||||
public function testReceiveAfterClose() {
|
||||
Loop::run(function () {
|
||||
|
@ -78,7 +78,7 @@ class ChannelledStreamTest extends TestCase {
|
||||
|
||||
/**
|
||||
* @depends testSendReceive
|
||||
* @expectedException \Amp\Parallel\ChannelException
|
||||
* @expectedException \Amp\Parallel\Sync\ChannelException
|
||||
*/
|
||||
public function testInvalidDataReceived() {
|
||||
Loop::run(function () {
|
||||
@ -95,7 +95,7 @@ class ChannelledStreamTest extends TestCase {
|
||||
|
||||
/**
|
||||
* @depends testSendReceive
|
||||
* @expectedException \Amp\Parallel\ChannelException
|
||||
* @expectedException \Amp\Parallel\Sync\ChannelException
|
||||
*/
|
||||
public function testSendUnserializableData() {
|
||||
Loop::run(function () {
|
||||
@ -112,7 +112,7 @@ class ChannelledStreamTest extends TestCase {
|
||||
|
||||
/**
|
||||
* @depends testSendReceive
|
||||
* @expectedException \Amp\Parallel\ChannelException
|
||||
* @expectedException \Amp\Parallel\Sync\ChannelException
|
||||
*/
|
||||
public function testSendAfterClose() {
|
||||
Loop::run(function () {
|
||||
@ -134,7 +134,7 @@ class ChannelledStreamTest extends TestCase {
|
||||
|
||||
/**
|
||||
* @depends testSendReceive
|
||||
* @expectedException \Amp\Parallel\ChannelException
|
||||
* @expectedException \Amp\Parallel\Sync\ChannelException
|
||||
*/
|
||||
public function testReceiveAfterClose() {
|
||||
Loop::run(function () {
|
||||
|
@ -19,7 +19,7 @@ class LockTest extends TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Amp\Parallel\LockAlreadyReleasedError
|
||||
* @expectedException \Amp\Parallel\Sync\LockAlreadyReleasedError
|
||||
*/
|
||||
public function testThrowsOnMultiRelease() {
|
||||
$lock = new Lock($this->createCallback(1));
|
||||
|
@ -35,7 +35,7 @@ class SharedMemoryParcelTest extends AbstractParcelTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Amp\Parallel\SharedMemoryException
|
||||
* @expectedException \Amp\Parallel\Sync\SharedMemoryException
|
||||
*/
|
||||
public function testUnwrapThrowsErrorIfFreed() {
|
||||
$object = new SharedMemoryParcel(new \stdClass());
|
||||
|
Loading…
x
Reference in New Issue
Block a user