1
0
mirror of https://github.com/danog/parallel.git synced 2024-12-02 17:52:14 +01:00
parallel/src/Process/ChannelledProcess.php

133 lines
3.1 KiB
PHP
Raw Normal View History

2015-08-27 16:10:08 +02:00
<?php
namespace Icicle\Concurrent\Process;
use Icicle\Concurrent\ContextInterface;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Exception\StatusError;
2015-08-27 16:10:08 +02:00
use Icicle\Concurrent\Exception\SynchronizationError;
use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ChannelInterface;
2015-08-29 09:13:14 +02:00
use Icicle\Concurrent\Sync\Internal\ExitStatusInterface;
2015-08-27 16:10:08 +02:00
class ChannelledProcess implements ContextInterface
{
/**
* @var \Icicle\Concurrent\Process\Process
*/
private $process;
/**
* @var \Icicle\Concurrent\Sync\Channel
*/
private $channel;
/**
* @param string $path Path to PHP script.
* @param string $cwd Working directory.
* @param mixed[] $env Array of environment variables.
*/
public function __construct($path, $cwd = '', array $env = [])
{
$command = PHP_BINARY . ' ' . $path;
$this->process = new Process($command, $cwd, $env);
}
/**
* Resets process values.
*/
public function __clone()
{
$this->process = clone $this->process;
$this->channel = null;
}
2015-08-27 16:10:08 +02:00
/**
* {@inheritdoc}
*/
public function start()
{
$this->process->start();
$this->channel = new Channel($this->process->getStdOut(), $this->process->getStdIn());
}
/**
* {@inheritdoc}
*/
public function isRunning()
{
return $this->process->isRunning();
}
/**
* {@inheritdoc}
*/
public function receive()
{
if (!$this->channel instanceof ChannelInterface) {
throw new StatusError('The process has not been started.');
2015-08-27 16:10:08 +02:00
}
$data = (yield $this->channel->receive());
if ($data instanceof ExitStatusInterface) {
$data = $data->getResult();
throw new SynchronizationError(sprintf(
'Thread unexpectedly exited with result of type: %s',
is_object($data) ? get_class($data) : gettype($data)
));
}
yield $data;
2015-08-27 16:10:08 +02:00
}
/**
* {@inheritdoc}
*/
public function send($data)
{
if (!$this->channel instanceof ChannelInterface) {
throw new StatusError('The process has not been started.');
}
if ($data instanceof ExitStatusInterface) {
throw new InvalidArgumentError('Cannot send exit status objects.');
2015-08-27 16:10:08 +02:00
}
yield $this->channel->send($data);
}
/**
* {@inheritdoc}
*/
public function join()
{
if (!$this->channel instanceof ChannelInterface) {
throw new StatusError('The process has not been started.');
}
try {
$response = (yield $this->channel->receive());
yield $this->process->join();
if (!$response instanceof ExitStatusInterface) {
throw new SynchronizationError('Did not receive an exit status from thread.');
}
yield $response->getResult();
} finally {
$this->channel->close();
}
2015-08-27 16:10:08 +02:00
}
/**
* {@inheritdoc}
*/
public function kill()
{
$this->process->kill();
}
}