1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-27 04:44:56 +01:00

Add queue and factory functions

This commit is contained in:
Aaron Piotrowski 2016-01-11 09:32:06 -06:00
parent 7dce2d449d
commit 1d9804995f
3 changed files with 65 additions and 9 deletions

View File

@ -80,8 +80,8 @@ class DefaultPool implements Pool
$this->maxSize = $maxSize; $this->maxSize = $maxSize;
$this->minSize = $minSize; $this->minSize = $minSize;
// Create the default factory if none is given. // Use the global factory if none is given.
$this->factory = $factory ?: new DefaultWorkerFactory(); $this->factory = $factory ?: factory();
$this->workers = new \SplObjectStorage(); $this->workers = new \SplObjectStorage();
$this->idleWorkers = new \SplQueue(); $this->idleWorkers = new \SplQueue();

View File

@ -65,7 +65,7 @@ class DefaultQueue implements Queue
throw new InvalidArgumentError('Maximum size must be a non-negative integer at least '.$minSize.'.'); throw new InvalidArgumentError('Maximum size must be a non-negative integer at least '.$minSize.'.');
} }
$this->factory = $factory ?: new DefaultWorkerFactory(); $this->factory = $factory ?: factory();
$this->minSize = $minSize; $this->minSize = $minSize;
$this->maxSize = $maxSize; $this->maxSize = $maxSize;
$this->workers = new \SplObjectStorage(); $this->workers = new \SplObjectStorage();

View File

@ -29,7 +29,7 @@ if (!function_exists(__NAMESPACE__ . '\pool')) {
/** /**
* @coroutine * @coroutine
* *
* Enqueues a task to be executed by the worker pool. * Enqueues a task to be executed by the global worker pool.
* *
* @param \Icicle\Concurrent\Worker\Task $task The task to enqueue. * @param \Icicle\Concurrent\Worker\Task $task The task to enqueue.
* *
@ -43,11 +43,25 @@ if (!function_exists(__NAMESPACE__ . '\pool')) {
} }
/** /**
* @param \Icicle\Concurrent\Worker\WorkerFactory|null $factory * Creates a worker using the global worker factory.
* *
* @return \Icicle\Concurrent\Worker\Worker * @return \Icicle\Concurrent\Worker\Worker
*/ */
function create(WorkerFactory $factory = null) function create()
{
$worker = factory()->create();
$worker->start();
return $worker;
}
/**
* Gets or sets the global worker factory.
*
* @param \Icicle\Concurrent\Worker\WorkerFactory|null $factory
*
* @return \Icicle\Concurrent\Worker\WorkerFactory
*/
function factory(WorkerFactory $factory = null)
{ {
static $instance; static $instance;
@ -57,8 +71,50 @@ if (!function_exists(__NAMESPACE__ . '\pool')) {
$instance = new DefaultWorkerFactory(); $instance = new DefaultWorkerFactory();
} }
$worker = $instance->create(); return $factory;
$worker->start(); }
return $worker;
/**
* Gets or sets the global worker queue instance.
*
* @param \Icicle\Concurrent\Worker\Queue|null $queue
*
* @return \Icicle\Concurrent\Worker\Queue
*/
function queue(Queue $queue = null)
{
static $instance;
if (null !== $queue) {
$instance = $queue;
} elseif (null === $instance) {
$instance = new DefaultQueue();
}
if (!$instance->isRunning()) {
$instance->start();
}
return $instance;
}
/**
* Pulls a worker from the global worker queue.
*
* @return \Icicle\Concurrent\Worker\Worker
*/
function pull()
{
return queue()->pull();
}
/**
* Pushes a worker back onto the global worker queue.
*
* @param \Icicle\Concurrent\Worker\Worker $worker
*/
function push(Worker $worker)
{
queue()->push($worker);
} }
} }