1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Merge branch 'v1.x'

This commit is contained in:
Aaron Piotrowski 2016-01-25 23:02:22 -06:00
commit 8649fd7be8
2 changed files with 47 additions and 24 deletions

View File

@ -1,4 +1,14 @@
<?php
namespace Icicle\Concurrent\Exception;
class WorkerException extends \Exception implements Exception {}
class WorkerException extends \Exception implements Exception
{
/**
* @param string $message
* @param \Throwable|null $previous
*/
public function __construct(string $message, \Throwable $previous = null)
{
parent::__construct($message, 0, $previous);
}
}

View File

@ -5,6 +5,7 @@ use Icicle\Awaitable\Delayed;
use Icicle\Concurrent\Strand;
use Icicle\Concurrent\Exception\{StatusError, WorkerException};
use Icicle\Concurrent\Worker\Internal\TaskFailure;
use Icicle\Coroutine\Coroutine;
/**
* Base class for most common types of task workers.
@ -16,27 +17,21 @@ abstract class AbstractWorker implements Worker
*/
private $context;
/**
* @var bool
*/
private $idle = true;
/**
* @var bool
*/
private $shutdown = false;
/**
* @var \Icicle\Awaitable\Delayed
* @var \Icicle\Coroutine\Coroutine
*/
private $activeDelayed;
private $active;
/**
* @var \SplQueue
*/
private $busyQueue;
/**
* @param \Icicle\Concurrent\Strand $strand
*/
@ -59,7 +54,7 @@ abstract class AbstractWorker implements Worker
*/
public function isIdle(): bool
{
return $this->idle;
return null === $this->active;
}
/**
@ -84,27 +79,26 @@ abstract class AbstractWorker implements Worker
}
// If the worker is currently busy, store the task in a busy queue.
if (!$this->idle) {
if (null !== $this->active) {
$delayed = new Delayed();
$this->busyQueue->enqueue($delayed);
yield $delayed;
}
$this->idle = false;
$this->activeDelayed = new Delayed();
$this->active = new Coroutine($this->send($task));
try {
yield from $this->context->send($task);
$result = yield from $this->context->receive();
$result = yield $this->active;
} catch (\Throwable $exception) {
$this->kill();
throw new WorkerException('Sending the task to the worker failed.', $exception);
} finally {
$this->idle = true;
$this->activeDelayed->resolve();
$this->active = null;
}
// We're no longer busy at the moment, so dequeue a waiting task.
if (!$this->busyQueue->isEmpty()) {
$this->busyQueue->dequeue()->resolve();
}
// We're no longer busy at the moment, so dequeue a waiting task.
if (!$this->busyQueue->isEmpty()) {
$this->busyQueue->dequeue()->resolve();
}
if ($result instanceof TaskFailure) {
@ -114,6 +108,21 @@ abstract class AbstractWorker implements Worker
return $result;
}
/**
* @coroutine
*
* @param \Icicle\Concurrent\Worker\Task $task
*
* @return \Generator
*
* @resolve mixed
*/
private function send(Task $task): \Generator
{
yield from $this->context->send($task);
return yield from $this->context->receive();
}
/**
* {@inheritdoc}
*/
@ -129,8 +138,12 @@ abstract class AbstractWorker implements Worker
$this->cancelPending();
// If a task is currently running, wait for it to finish.
if (!$this->idle) {
yield $this->activeDelayed;
if (null !== $this->active) {
try {
yield $this->active;
} catch (\Throwable $exception) {
// Ignore failure in this context.
}
}
yield from $this->context->send(0);