1
0
mirror of https://github.com/danog/parallel.git synced 2024-12-02 17:52:14 +01:00
parallel/src/Process/ChannelledProcess.php

146 lines
3.2 KiB
PHP
Raw Normal View History

2015-08-27 16:10:08 +02:00
<?php
namespace Icicle\Concurrent\Process;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Exception\StatusError;
2015-08-27 16:10:08 +02:00
use Icicle\Concurrent\Exception\SynchronizationError;
2015-12-05 06:50:32 +01:00
use Icicle\Concurrent\Process as ProcessContext;
2015-08-27 16:10:08 +02:00
use Icicle\Concurrent\Sync\Channel;
2015-12-05 06:50:32 +01:00
use Icicle\Concurrent\Sync\DataChannel;
use Icicle\Concurrent\Sync\Internal\ExitStatus;
2015-08-27 16:10:08 +02:00
2015-12-05 06:50:32 +01:00
class ChannelledProcess implements Channel, ProcessContext
2015-08-27 16:10:08 +02:00
{
/**
* @var \Icicle\Concurrent\Process\Process
*/
private $process;
/**
* @var \Icicle\Concurrent\Sync\Channel
*/
private $channel;
/**
* @param string $path Path to PHP script.
* @param string $cwd Working directory.
* @param mixed[] $env Array of environment variables.
*/
public function __construct($path, $cwd = '', array $env = [])
{
$command = PHP_BINARY . ' ' . $path;
$this->process = new Process($command, $cwd, $env);
}
/**
* Resets process values.
*/
public function __clone()
{
$this->process = clone $this->process;
$this->channel = null;
}
2015-08-27 16:10:08 +02:00
/**
* {@inheritdoc}
*/
public function start()
{
$this->process->start();
2015-12-05 06:50:32 +01:00
$this->channel = new DataChannel($this->process->getStdOut(), $this->process->getStdIn());
2015-08-27 16:10:08 +02:00
}
/**
* {@inheritdoc}
*/
public function isRunning()
{
return $this->process->isRunning();
}
/**
* {@inheritdoc}
*/
public function receive()
{
if (null === $this->channel) {
throw new StatusError('The process has not been started.');
2015-08-27 16:10:08 +02:00
}
$data = (yield $this->channel->receive());
2015-12-05 06:50:32 +01:00
if ($data instanceof ExitStatus) {
$data = $data->getResult();
throw new SynchronizationError(sprintf(
'Thread unexpectedly exited with result of type: %s',
is_object($data) ? get_class($data) : gettype($data)
));
}
yield $data;
2015-08-27 16:10:08 +02:00
}
/**
* {@inheritdoc}
*/
public function send($data)
{
if (null === $this->channel) {
throw new StatusError('The process has not been started.');
}
2015-12-05 06:50:32 +01:00
if ($data instanceof ExitStatus) {
throw new InvalidArgumentError('Cannot send exit status objects.');
2015-08-27 16:10:08 +02:00
}
yield $this->channel->send($data);
}
/**
* {@inheritdoc}
*/
public function join()
{
if (null === $this->channel) {
throw new StatusError('The process has not been started.');
}
$response = (yield $this->channel->receive());
yield $this->process->join();
2015-12-05 06:50:32 +01:00
if (!$response instanceof ExitStatus) {
throw new SynchronizationError('Did not receive an exit status from thread.');
}
yield $response->getResult();
2015-08-27 16:10:08 +02:00
}
/**
* {@inheritdoc}
*/
public function kill()
{
$this->process->kill();
$this->channel = null;
2015-08-27 16:10:08 +02:00
}
/**
* {@inheritdoc}
*/
public function getPid()
{
return $this->process->getPid();
}
/**
* {@inheritdoc}
*/
public function signal($signo)
{
$this->process->signal($signo);
}
2015-08-27 16:10:08 +02:00
}