mirror of
https://github.com/danog/parallel.git
synced 2024-11-30 04:39:01 +01:00
Add worker environment
This commit is contained in:
parent
16f7172a61
commit
ae267e64ac
@ -12,6 +12,8 @@ $paths = [
|
||||
dirname(__DIR__) . '/vendor/autoload.php',
|
||||
];
|
||||
|
||||
$autoloadPath = null;
|
||||
|
||||
foreach ($paths as $path) {
|
||||
if (file_exists($path)) {
|
||||
$autoloadPath = $path;
|
||||
@ -19,7 +21,7 @@ foreach ($paths as $path) {
|
||||
}
|
||||
}
|
||||
|
||||
if (!isset($autoloadPath)) {
|
||||
if (null === $autoloadPath) {
|
||||
fwrite(STDERR, 'Could not locate autoload.php.');
|
||||
exit(1);
|
||||
}
|
||||
@ -29,6 +31,7 @@ require $autoloadPath;
|
||||
use Icicle\Concurrent\Sync\Channel;
|
||||
use Icicle\Concurrent\Sync\Internal\ExitFailure;
|
||||
use Icicle\Concurrent\Sync\Internal\ExitSuccess;
|
||||
use Icicle\Concurrent\Worker\Environment;
|
||||
use Icicle\Concurrent\Worker\Internal\TaskRunner;
|
||||
use Icicle\Coroutine;
|
||||
use Icicle\Loop;
|
||||
@ -37,8 +40,9 @@ use Icicle\Socket\Stream\WritableStream;
|
||||
|
||||
Coroutine\create(function () {
|
||||
$channel = new Channel(new ReadableStream(STDIN), new WritableStream(STDOUT));
|
||||
$environment = new Environment();
|
||||
|
||||
$runner = new TaskRunner($channel);
|
||||
$runner = new TaskRunner($channel, $environment);
|
||||
|
||||
try {
|
||||
$result = new ExitSuccess(yield $runner->run());
|
||||
|
@ -19,8 +19,8 @@
|
||||
}
|
||||
],
|
||||
"require": {
|
||||
"icicleio/icicle": "^0.8",
|
||||
"icicleio/socket": "^0.3"
|
||||
"icicleio/icicle": "^0.8.2",
|
||||
"icicleio/socket": "^0.3.1"
|
||||
},
|
||||
"require-dev": {
|
||||
"phpunit/phpunit": "^4.6",
|
||||
|
121
src/Worker/Environment.php
Normal file
121
src/Worker/Environment.php
Normal file
@ -0,0 +1,121 @@
|
||||
<?php
|
||||
namespace Icicle\Concurrent\Worker;
|
||||
|
||||
use Icicle\Loop;
|
||||
|
||||
class Environment
|
||||
{
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
private $data = [];
|
||||
|
||||
/**
|
||||
* @var \SplPriorityQueue
|
||||
*/
|
||||
private $queue;
|
||||
|
||||
/**
|
||||
* @var \Icicle\Loop\Events\TimerInterface
|
||||
*/
|
||||
private $timer;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$this->queue = new \SplPriorityQueue();
|
||||
|
||||
$this->timer = Loop\periodic(1, function () {
|
||||
$time = time();
|
||||
while (!$this->queue->isEmpty()) {
|
||||
$key = $this->queue->top();
|
||||
|
||||
if (isset($this->data[$key])) {
|
||||
list( , $expire) = $this->data[$key];
|
||||
|
||||
if ($time < -$expire) {
|
||||
break;
|
||||
}
|
||||
|
||||
unset($this->data[$key]);
|
||||
}
|
||||
|
||||
$this->queue->extract();
|
||||
}
|
||||
|
||||
if ($this->queue->isEmpty()) {
|
||||
$this->timer->stop();
|
||||
}
|
||||
});
|
||||
|
||||
$this->timer->stop();
|
||||
$this->timer->unreference();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function exists($key)
|
||||
{
|
||||
return isset($this->data[$key]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
public function get($key)
|
||||
{
|
||||
list($value, , $ttl) = $this->data[$key];
|
||||
|
||||
if (0 !== $ttl) {
|
||||
$this->data[$key] = [$value, -(time() + $ttl), $ttl];
|
||||
}
|
||||
|
||||
return $value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $ttl
|
||||
*/
|
||||
public function set($key, $value, $ttl = 0)
|
||||
{
|
||||
$ttl = (int) $ttl;
|
||||
if (0 > $ttl) {
|
||||
$ttl = 0;
|
||||
}
|
||||
|
||||
if (0 !== $ttl) {
|
||||
$expire = time() + $ttl;
|
||||
$this->queue->insert($key, -$expire);
|
||||
|
||||
if (!$this->timer->isPending()) {
|
||||
$this->timer->start();
|
||||
}
|
||||
} else {
|
||||
$expire = 0;
|
||||
}
|
||||
|
||||
$this->data[$key] = [$value, $expire, $ttl];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
*/
|
||||
public function delete($key)
|
||||
{
|
||||
unset($this->data[$key]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
*/
|
||||
public function count()
|
||||
{
|
||||
return count($this->data);
|
||||
}
|
||||
}
|
@ -3,7 +3,7 @@ namespace Icicle\Concurrent\Worker;
|
||||
|
||||
class HelloTask implements TaskInterface
|
||||
{
|
||||
public function run()
|
||||
public function run(Environment $environment)
|
||||
{
|
||||
return "Hello, world!";
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
namespace Icicle\Concurrent\Worker\Internal;
|
||||
|
||||
use Icicle\Concurrent\ChannelInterface;
|
||||
use Icicle\Concurrent\Worker\Environment;
|
||||
use Icicle\Concurrent\Worker\TaskInterface;
|
||||
|
||||
class TaskRunner
|
||||
@ -16,9 +17,15 @@ class TaskRunner
|
||||
*/
|
||||
private $channel;
|
||||
|
||||
public function __construct(ChannelInterface $channel)
|
||||
/**
|
||||
* @var \Icicle\Concurrent\Worker\Environment
|
||||
*/
|
||||
private $environment;
|
||||
|
||||
public function __construct(ChannelInterface $channel, Environment $environment)
|
||||
{
|
||||
$this->channel = $channel;
|
||||
$this->environment = $environment;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -34,7 +41,7 @@ class TaskRunner
|
||||
$this->idle = false;
|
||||
|
||||
try {
|
||||
$result = (yield $task->run());
|
||||
$result = (yield $task->run($this->environment));
|
||||
} catch (\Exception $exception) {
|
||||
$result = new TaskFailure($exception);
|
||||
}
|
||||
|
@ -13,9 +13,11 @@ interface TaskInterface
|
||||
*
|
||||
* Does not have to be a coroutine, can also be a regular function returning a value.
|
||||
*
|
||||
* @param \Icicle\Concurrent\Worker\Environment
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @resolve mixed
|
||||
*/
|
||||
public function run();
|
||||
public function run(Environment $environment);
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ class WorkerFork extends Worker
|
||||
public function __construct()
|
||||
{
|
||||
parent::__construct(new Fork(function () {
|
||||
$runner = new TaskRunner($this);
|
||||
$runner = new TaskRunner($this, new Environment());
|
||||
yield $runner->run();
|
||||
}));
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ class WorkerThread extends Worker
|
||||
public function __construct()
|
||||
{
|
||||
parent::__construct(new Thread(function () {
|
||||
$runner = new TaskRunner($this);
|
||||
$runner = new TaskRunner($this, new Environment());
|
||||
yield $runner->run();
|
||||
}));
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
<?php
|
||||
namespace Icicle\Tests\Concurrent\Worker;
|
||||
|
||||
use Icicle\Concurrent\Worker\Environment;
|
||||
use Icicle\Concurrent\Worker\TaskInterface;
|
||||
|
||||
class TestTask implements TaskInterface
|
||||
@ -12,7 +13,7 @@ class TestTask implements TaskInterface
|
||||
$this->returnValue = $returnValue;
|
||||
}
|
||||
|
||||
public function run()
|
||||
public function run(Environment $environment)
|
||||
{
|
||||
return $this->returnValue;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user