1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Keep track of times pulled in DefaultQueue; add more tests

This commit is contained in:
Aaron Piotrowski 2015-12-16 17:43:14 -06:00
parent 78b8741537
commit c08f6c9058
2 changed files with 101 additions and 25 deletions

View File

@ -33,6 +33,11 @@ class DefaultQueue implements Queue
*/
private $busy;
/**
* @var \SplObjectStorage
*/
private $workers;
/**
* @var bool
*/
@ -63,6 +68,7 @@ class DefaultQueue implements Queue
$this->factory = $factory ?: new DefaultWorkerFactory();
$this->minSize = $minSize;
$this->maxSize = $maxSize;
$this->workers = new \SplObjectStorage();
$this->idle = new \SplQueue();
$this->busy = new \SplQueue();
}
@ -89,6 +95,7 @@ class DefaultQueue implements Queue
while (--$count >= 0) {
$worker = $this->factory->create();
$worker->start();
$this->workers->attach($worker, 0);
$this->idle->push($worker);
}
@ -113,6 +120,7 @@ class DefaultQueue implements Queue
// Max worker count has not been reached, so create another worker.
$worker = $this->factory->create();
$worker->start();
$this->workers->attach($worker, 0);
}
} else {
// Shift a worker off the idle queue.
@ -121,6 +129,7 @@ class DefaultQueue implements Queue
} while (!$worker->isRunning());
$this->busy->push($worker);
$this->workers[$worker] += 1;
return $worker;
}
@ -134,23 +143,23 @@ class DefaultQueue implements Queue
throw new StatusError('The queue is not running.');
}
$throw = true;
foreach ($this->busy as $key => $busy) {
if ($busy === $worker) {
$throw = false;
unset($this->busy[$key]);
break;
}
}
if ($throw) {
if (!$this->workers->contains($worker)) {
throw new InvalidArgumentError(
'The provided worker was not part of this queue or was already pushed back into the queue.'
);
}
$this->idle->push($worker);
if (0 === ($this->workers[$worker] -= 1)) {
// Worker is completely idle, remove from busy queue and add to idle queue.
foreach ($this->busy as $key => $busy) {
if ($busy === $worker) {
unset($this->busy[$key]);
break;
}
}
$this->idle->push($worker);
}
}
/**
@ -198,13 +207,7 @@ class DefaultQueue implements Queue
$shutdowns = [];
foreach ($this->idle as $worker) {
if ($worker->isRunning()) {
$shutdowns[] = new Coroutine($worker->shutdown());
}
}
foreach ($this->busy as $worker) {
foreach ($this->workers as $worker) {
if ($worker->isRunning()) {
$shutdowns[] = new Coroutine($worker->shutdown());
}
@ -222,11 +225,7 @@ class DefaultQueue implements Queue
{
$this->running = false;
foreach ($this->idle as $worker) {
$worker->kill();
}
foreach ($this->busy as $worker) {
foreach ($this->workers as $worker) {
$worker->kill();
}
}

View File

@ -81,12 +81,17 @@ abstract class AbstractQueueTest extends TestCase
$queue->push($worker2);
$this->assertNotSame($worker1, $worker2);
$this->assertSame(2, $queue->getWorkerCount());
$this->assertSame(2, $queue->getIdleWorkerCount());
$worker3 = $queue->pull();
$this->assertSame($worker1, $worker3);
$this->assertSame(1, $queue->getIdleWorkerCount());
$worker4 = $queue->pull();
$this->assertSame($worker2, $worker4);
$this->assertSame(0, $queue->getIdleWorkerCount());
$this->assertSame(2, $queue->getWorkerCount());
yield $queue->shutdown();
});
@ -103,10 +108,15 @@ abstract class AbstractQueueTest extends TestCase
$worker1 = $queue->pull();
$worker2 = $queue->pull();
$this->assertSame(2, $queue->getWorkerCount());
$this->assertSame(0, $queue->getIdleWorkerCount());
$queue->push($worker2);
$this->assertSame(1, $queue->getIdleWorkerCount());
$worker3 = $queue->pull();
$this->assertSame(0, $queue->getIdleWorkerCount());
$this->assertSame(2, $queue->getWorkerCount());
$this->assertSame($worker2, $worker3);
yield $queue->shutdown();
@ -116,7 +126,7 @@ abstract class AbstractQueueTest extends TestCase
/**
* @depends testPullPushIsCyclical
*/
public function testPullReturnsFirstBusyWhenAllBusy()
public function testPullReturnsFirstBusyWhenAllBusyAndAtMax()
{
Coroutine\run(function () {
$queue = $this->createQueue(2, 2);
@ -139,6 +149,73 @@ abstract class AbstractQueueTest extends TestCase
});
}
/**
* @depends testPullReturnsFirstBusyWhenAllBusyAndAtMax
*/
public function testPullSpawnsNewWorkerWhenAllOthersBusyAndBelowMax()
{
Coroutine\run(function () {
$queue = $this->createQueue(2, 4);
$queue->start();
$worker1 = $queue->pull();
$worker2 = $queue->pull();
$this->assertSame(2, $queue->getWorkerCount());
$worker3 = $queue->pull();
$this->assertSame(3, $queue->getWorkerCount());
$this->assertNotSame($worker1, $worker3);
$this->assertNotSame($worker2, $worker3);
$worker4 = $queue->pull();
$this->assertSame(4, $queue->getWorkerCount());
$this->assertNotSame($worker1, $worker4);
$this->assertNotSame($worker2, $worker4);
$this->assertNotSame($worker3, $worker4);
$worker5 = $queue->pull();
$this->assertSame(4, $queue->getWorkerCount());
$this->assertSame($worker1, $worker5);
yield $queue->shutdown();
});
}
/**
* @depends testPullPushIsCyclical
*/
public function testPushOnlyMarksIdleAfterPushesEqualPulls()
{
Coroutine\run(function () {
$queue = $this->createQueue(2, 2);
$queue->start();
$worker1 = $queue->pull();
$worker2 = $queue->pull();
$worker3 = $queue->pull();
$this->assertSame($worker1, $worker3);
// Should only mark $worker2 as idle, not $worker3 even though it's pushed first.
$queue->push($worker3);
$queue->push($worker2);
// Should pull $worker2 again.
$worker4 = $queue->pull();
$this->assertSame($worker2, $worker4);
// Pushing $worker1 first, which should now be marked as idle (and so should $worker2/4)
$queue->push($worker1);
$queue->push($worker4);
// Should pull $worker1 now since it was marked idle.
$worker5 = $queue->pull();
$this->assertSame($worker1, $worker5);
yield $queue->shutdown();
});
}
public function testPushForeignWorker()
{
Coroutine\run(function () {