1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-23 06:21:12 +01:00

Create stream instances in process

This commit is contained in:
Aaron Piotrowski 2015-08-25 10:27:39 -05:00
parent 3c48042c30
commit 57d31d6b80
4 changed files with 40 additions and 23 deletions

View File

@ -13,7 +13,7 @@ Coroutine\create(function () {
$returnValue = (yield $worker->enqueue(new HelloTask())); $returnValue = (yield $worker->enqueue(new HelloTask()));
printf("Return value: %d\n", $returnValue); printf("Return value: %s\n", $returnValue);
$code = (yield $worker->shutdown()); $code = (yield $worker->shutdown());
printf("Code: %d\n", $code); printf("Code: %d\n", $code);

View File

@ -34,17 +34,17 @@ class Process
private $options; private $options;
/** /**
* @var resource|null * @var \Icicle\Socket\Stream\WritableStream|null
*/ */
private $stdin; private $stdin;
/** /**
* @var resource|null * @var \Icicle\Socket\Stream\ReadableStream|null
*/ */
private $stdout; private $stdout;
/** /**
* @var resource|null * @var \Icicle\Socket\Stream\ReadableStream|null
*/ */
private $stderr; private $stderr;
@ -60,8 +60,9 @@ class Process
/** /**
* @param string $command Command to run. * @param string $command Command to run.
* @param string|null $cwd Working directory or use null to use the working directory of the current PHP process. * @param string|null $cwd Working directory or use an empty string to use the working directory of the current
* @param mixed[] $env Environment variables or use null to inherit from the current PHP process. * PHP process.
* @param mixed[] $env Environment variables or use an empty array to inherit from the current PHP process.
* @param mixed[] $options Options for proc_open(). * @param mixed[] $options Options for proc_open().
*/ */
public function __construct($command, $cwd = '', array $env = [], array $options = []) public function __construct($command, $cwd = '', array $env = [], array $options = [])
@ -87,6 +88,18 @@ class Process
public function __destruct() public function __destruct()
{ {
$this->kill(); // Will only terminate if the process is still running. $this->kill(); // Will only terminate if the process is still running.
if (null !== $this->stdin) {
$this->stdin->close();
}
if (null !== $this->stdout) {
$this->stdout->close();
}
if (null !== $this->stderr) {
$this->stderr->close();
}
} }
/** /**
@ -130,13 +143,11 @@ class Process
$this->pid = $status['pid']; $this->pid = $status['pid'];
list($this->stdin, $this->stdout, $this->stderr, $stream) = $pipes; $this->stdin = new WritableStream($pipes[0]);
$this->stdout = new ReadableStream($pipes[1]);
$this->stderr = new ReadableStream($pipes[2]);
// $this->stdin = new WritableStream($pipes[0]); $stream = new ReadableStream($pipes[3]);
// $this->stdout = new ReadableStream($pipes[1]);
// $this->stderr = new ReadableStream($pipes[2]);
$stream = new ReadableStream($stream);
$this->promise = new Coroutine($stream->read()); $this->promise = new Coroutine($stream->read());
$this->promise = $this->promise $this->promise = $this->promise
@ -147,9 +158,11 @@ class Process
return (int) $code; return (int) $code;
}) })
->cleanup(function () use ($stream) { ->cleanup(function () use ($stream) {
proc_close($this->process); if (is_resource($this->process)) {
$this->process = null; proc_close($this->process);
//$this->stdin->close(); $this->process = null;
}
$this->stdin->close();
$stream->close(); $stream->close();
}); });
} }
@ -168,8 +181,8 @@ class Process
try { try {
yield $this->promise; yield $this->promise;
} finally { } finally {
//$this->stdout->close(); $this->stdout->close();
//$this->stderr->close(); $this->stderr->close();
} }
} }
@ -267,7 +280,7 @@ class Process
/** /**
* Gets the process input stream (STDIN). * Gets the process input stream (STDIN).
* *
* @return resource * @return \Icicle\Socket\Stream\WritableStream
* *
* @throws Exception If the process has not been started. * @throws Exception If the process has not been started.
*/ */
@ -283,7 +296,7 @@ class Process
/** /**
* Gets the process output stream (STDOUT). * Gets the process output stream (STDOUT).
* *
* @return resource * @return \Icicle\Socket\Stream\ReadableStream
* *
* @throws Exception If the process has not been started. * @throws Exception If the process has not been started.
*/ */
@ -299,7 +312,7 @@ class Process
/** /**
* Gets the process error stream (STDERR). * Gets the process error stream (STDERR).
* *
* @return resource * @return \Icicle\Socket\Stream\ReadableStream
* *
* @throws Exception If the process has not been started. * @throws Exception If the process has not been started.
*/ */

View File

@ -5,7 +5,6 @@ class HelloTask implements TaskInterface
{ {
public function run() public function run()
{ {
echo "Hello"; return "Hello, world!";
return 42;
} }
} }

View File

@ -7,6 +7,8 @@ use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ChannelInterface; use Icicle\Concurrent\Sync\ChannelInterface;
use Icicle\Coroutine\Coroutine; use Icicle\Coroutine\Coroutine;
use Icicle\Loop; use Icicle\Loop;
use Icicle\Socket\Stream\ReadableStream;
use Icicle\Socket\Stream\WritableStream;
function run(ChannelInterface $channel) function run(ChannelInterface $channel)
{ {
@ -30,12 +32,15 @@ function run(ChannelInterface $channel)
} }
} }
// Redirect all output written using echo, print, printf, etc. to STDERR.
ob_start(function ($data) { ob_start(function ($data) {
$written = fwrite(STDERR, $data); $written = fwrite(STDERR, $data);
return ''; return '';
}, 1); }, 1);
$coroutine = new Coroutine(run(new Channel(STDIN, STDOUT))); $coroutine = new Coroutine(
run(new Channel(new ReadableStream(STDIN), new WritableStream(STDOUT)))
);
$coroutine->done(); $coroutine->done();
Loop\run(); Loop\run();