mirror of
https://github.com/danog/parallel.git
synced 2025-01-22 14:01:14 +01:00
Improve cancellation of pending tasks; remove worker from pool if stopped
This commit is contained in:
parent
aac99ed603
commit
dc41c173b9
@ -152,10 +152,12 @@ abstract class AbstractWorker implements Worker
|
||||
*/
|
||||
private function cancelPending()
|
||||
{
|
||||
$exception = new WorkerException('Worker was shut down.');
|
||||
if (!$this->busyQueue->isEmpty()) {
|
||||
$exception = new WorkerException('Worker was shut down.');
|
||||
|
||||
while (!$this->busyQueue->isEmpty()) {
|
||||
$this->busyQueue->dequeue()->cancel($exception);
|
||||
do {
|
||||
$this->busyQueue->dequeue()->cancel($exception);
|
||||
} while (!$this->busyQueue->isEmpty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,10 +2,8 @@
|
||||
namespace Icicle\Concurrent\Worker;
|
||||
|
||||
use Icicle\Awaitable;
|
||||
use Icicle\Awaitable\Delayed;
|
||||
use Icicle\Exception\InvalidArgumentError;
|
||||
use Icicle\Concurrent\Exception\StatusError;
|
||||
use Icicle\Concurrent\Exception\WorkerException;
|
||||
use Icicle\Coroutine\Coroutine;
|
||||
|
||||
/**
|
||||
@ -269,7 +267,13 @@ class DefaultPool implements Pool
|
||||
// Shift a worker off the idle queue.
|
||||
$worker = $this->idleWorkers->shift();
|
||||
}
|
||||
} while (!$worker->isRunning());
|
||||
|
||||
if ($worker->isRunning()) {
|
||||
break;
|
||||
}
|
||||
|
||||
$this->workers->detach($worker);
|
||||
} while (true);
|
||||
|
||||
$this->busyQueue->push($worker);
|
||||
$this->workers[$worker] += 1;
|
||||
|
Loading…
x
Reference in New Issue
Block a user