1
0
mirror of https://github.com/danog/parallel.git synced 2024-12-02 09:37:57 +01:00
parallel/lib/Context/Thread.php

331 lines
9.2 KiB
PHP
Raw Permalink Normal View History

2016-12-30 02:16:04 +01:00
<?php
2015-07-14 00:30:59 +02:00
namespace Amp\Parallel\Context;
2016-08-18 18:04:48 +02:00
use Amp\Failure;
2017-05-18 09:51:31 +02:00
use Amp\Loop;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitResult;
2017-12-08 04:26:55 +01:00
use Amp\Parallel\Sync\SynchronizationError;
2017-05-18 09:51:31 +02:00
use Amp\Promise;
use Amp\Success;
use function Amp\call;
2015-07-27 00:53:00 +02:00
/**
* Implements an execution context using native multi-threading.
2015-08-11 00:38:58 +02:00
*
* The thread context is not itself threaded. A local instance of the context is
* maintained both in the context that creates the thread and in the thread
* itself.
*
* @deprecated ext-pthreads development has been halted, see https://github.com/krakjoe/pthreads/issues/929
*/
2018-10-21 17:54:46 +02:00
final class Thread implements Context
2018-10-07 16:50:45 +02:00
{
2017-03-09 23:55:11 +01:00
const EXIT_CHECK_FREQUENCY = 250;
/** @var int */
private static $nextId = 1;
2016-08-26 17:10:03 +02:00
/** @var Internal\Thread An internal thread instance. */
private $thread;
2015-07-27 00:53:00 +02:00
2016-08-31 01:27:14 +02:00
/** @var \Amp\Parallel\Sync\ChannelledSocket A channel for communicating with the thread. */
private $channel;
2016-08-26 17:10:03 +02:00
/** @var resource */
private $socket;
2016-08-26 17:10:03 +02:00
/** @var callable */
private $function;
2016-08-26 17:10:03 +02:00
/** @var mixed[] */
private $args;
/** @var int|null */
private $id;
2016-08-26 17:10:03 +02:00
/** @var int */
2015-09-19 05:20:35 +02:00
private $oid = 0;
2015-09-08 19:55:29 +02:00
2017-03-09 23:55:11 +01:00
/** @var string */
private $watcher;
/**
* Checks if threading is enabled.
*
* @return bool True if threading is enabled, otherwise false.
*/
2018-10-21 17:34:32 +02:00
public static function isSupported(): bool
2018-10-07 16:50:45 +02:00
{
2016-08-18 18:04:48 +02:00
return \extension_loaded('pthreads');
}
2015-08-07 01:59:25 +02:00
/**
2017-12-11 00:01:10 +01:00
* Creates and starts a new thread.
*
* @param callable $function The callable to invoke in the thread. First argument is an instance of
* \Amp\Parallel\Sync\Channel.
* @param mixed ...$args Additional arguments to pass to the given callable.
*
* @return Promise<Thread> The thread object that was spawned.
2015-08-07 01:59:25 +02:00
*/
2018-10-07 16:50:45 +02:00
public static function run(callable $function, ...$args): Promise
{
2016-01-23 07:00:56 +01:00
$thread = new self($function, ...$args);
2019-08-28 01:14:56 +02:00
return call(function () use ($thread): \Generator {
yield $thread->start();
return $thread;
});
}
2015-08-18 17:12:06 +02:00
/**
2015-08-31 00:52:00 +02:00
* Creates a new thread.
*
* @param callable $function The callable to invoke in the thread. First argument is an instance of
* \Amp\Parallel\Sync\Channel.
* @param mixed ...$args Additional arguments to pass to the given callable.
2015-08-31 00:52:00 +02:00
*
2016-08-18 18:04:48 +02:00
* @throws \Error Thrown if the pthreads extension is not available.
2015-08-18 17:12:06 +02:00
*/
2018-10-07 16:50:45 +02:00
public function __construct(callable $function, ...$args)
{
2018-10-21 17:34:32 +02:00
if (!self::isSupported()) {
2016-08-18 18:04:48 +02:00
throw new \Error("The pthreads extension is required to create threads.");
}
$this->function = $function;
$this->args = $args;
}
/**
* Returns the thread to the condition before starting. The new thread can be started and run independently of the
* first thread.
*/
2018-10-07 16:50:45 +02:00
public function __clone()
{
$this->thread = null;
$this->socket = null;
$this->channel = null;
2015-09-19 05:20:35 +02:00
$this->oid = 0;
}
/**
* Kills the thread if it is still running.
*
2017-12-08 04:26:55 +01:00
* @throws \Amp\Parallel\Context\ContextException
2015-09-19 05:20:35 +02:00
*/
2018-10-07 16:50:45 +02:00
public function __destruct()
{
2016-08-18 18:04:48 +02:00
if (\getmypid() === $this->oid) {
2015-09-19 05:20:35 +02:00
$this->kill();
}
}
2015-08-18 17:12:06 +02:00
2015-07-27 00:53:00 +02:00
/**
* Checks if the context is running.
2015-07-27 00:53:00 +02:00
*
* @return bool True if the context is running, otherwise false.
2015-07-27 00:53:00 +02:00
*/
2018-10-07 16:50:45 +02:00
public function isRunning(): bool
{
2016-08-31 01:27:14 +02:00
return $this->channel !== null;
2015-08-07 01:59:25 +02:00
}
/**
2015-08-31 00:52:00 +02:00
* Spawns the thread and begins the thread's execution.
*
* @return Promise<int> Resolved once the thread has started.
*
2017-12-08 04:26:55 +01:00
* @throws \Amp\Parallel\Context\StatusError If the thread has already been started.
* @throws \Amp\Parallel\Context\ContextException If starting the thread was unsuccessful.
*/
2018-10-07 16:50:45 +02:00
public function start(): Promise
{
2016-08-23 01:25:19 +02:00
if ($this->oid !== 0) {
throw new StatusError('The thread has already been started.');
}
2016-08-18 18:04:48 +02:00
$this->oid = \getmypid();
2015-09-08 19:55:29 +02:00
2016-08-31 01:27:14 +02:00
$sockets = @\stream_socket_pair(
2016-09-02 01:10:52 +02:00
\stripos(\PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX,
2016-08-31 01:27:14 +02:00
STREAM_SOCK_STREAM,
STREAM_IPPROTO_IP
);
if ($sockets === false) {
$message = "Failed to create socket pair";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
return new Failure(new ContextException($message));
2016-08-31 01:27:14 +02:00
}
list($channel, $this->socket) = $sockets;
$this->id = self::$nextId++;
$thread = $this->thread = new Internal\Thread($this->id, $this->socket, $this->function, $this->args);
2017-11-25 15:59:07 +01:00
if (!$this->thread->start(\PTHREADS_INHERIT_INI)) {
return new Failure(new ContextException('Failed to start the thread.'));
}
$channel = $this->channel = new ChannelledSocket($channel, $channel);
2017-03-09 23:55:11 +01:00
2019-08-28 01:14:56 +02:00
$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function ($watcher) use ($thread, $channel): void {
if (!$thread->isRunning()) {
2017-12-27 06:16:44 +01:00
// Delay closing to avoid race condition between thread exiting and data becoming available.
Loop::delay(self::EXIT_CHECK_FREQUENCY, [$channel, "close"]);
2017-12-27 06:16:44 +01:00
Loop::cancel($watcher);
2017-03-09 23:55:11 +01:00
}
});
Loop::disable($this->watcher);
return new Success($this->id);
}
2015-08-11 00:38:58 +02:00
/**
* Immediately kills the context.
2015-08-31 00:52:00 +02:00
*
2016-08-18 18:04:48 +02:00
* @throws ContextException If killing the thread was unsuccessful.
*/
2019-08-28 01:14:56 +02:00
public function kill(): void
2018-10-07 16:50:45 +02:00
{
2016-08-23 01:25:19 +02:00
if ($this->thread !== null) {
2015-09-08 19:55:29 +02:00
try {
if ($this->thread->isRunning() && !$this->thread->kill()) {
2016-08-18 18:04:48 +02:00
throw new ContextException('Could not kill thread.');
2015-09-08 19:55:29 +02:00
}
} finally {
$this->close();
2015-09-06 21:59:24 +02:00
}
}
}
/**
* Closes channel and socket if still open.
*/
2019-08-28 01:14:56 +02:00
private function close(): void
2018-10-07 16:50:45 +02:00
{
2016-08-31 01:27:14 +02:00
if ($this->channel !== null) {
$this->channel->close();
}
$this->channel = null;
2017-03-09 23:55:11 +01:00
Loop::cancel($this->watcher);
}
2015-08-18 17:12:06 +02:00
/**
* Gets a promise that resolves when the context ends and joins with the
* parent context.
2015-08-18 17:12:06 +02:00
*
* @return \Amp\Promise<mixed>
*
* @throws StatusError Thrown if the context has not been started.
2015-08-31 00:52:00 +02:00
* @throws SynchronizationError Thrown if an exit status object is not received.
* @throws ContextException If the context stops responding.
2015-08-18 17:12:06 +02:00
*/
2018-10-07 16:50:45 +02:00
public function join(): Promise
{
2016-08-23 01:25:19 +02:00
if ($this->channel == null || $this->thread === null) {
2015-09-08 19:55:29 +02:00
throw new StatusError('The thread has not been started or has already finished.');
}
2019-08-28 01:14:56 +02:00
return call(function (): \Generator {
Loop::enable($this->watcher);
try {
$response = yield $this->channel->receive();
} catch (\Throwable $exception) {
$this->kill();
throw new ContextException("Failed to receive result from thread", 0, $exception);
} finally {
Loop::disable($this->watcher);
$this->close();
2015-08-18 17:12:06 +02:00
}
if (!$response instanceof ExitResult) {
$this->kill();
throw new SynchronizationError('Did not receive an exit result from thread.');
}
return $response->getResult();
});
2015-08-18 17:12:06 +02:00
}
/**
* {@inheritdoc}
2015-08-18 17:12:06 +02:00
*/
2018-10-07 16:50:45 +02:00
public function receive(): Promise
{
2016-08-23 01:25:19 +02:00
if ($this->channel === null) {
2016-08-18 18:04:48 +02:00
throw new StatusError('The process has not been started.');
}
2019-08-28 01:14:56 +02:00
return call(function (): \Generator {
Loop::enable($this->watcher);
try {
$data = yield $this->channel->receive();
} finally {
Loop::disable($this->watcher);
}
if ($data instanceof ExitResult) {
$data = $data->getResult();
throw new SynchronizationError(\sprintf(
'Thread process unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
));
}
2017-03-09 23:48:34 +01:00
return $data;
});
2015-08-18 17:12:06 +02:00
}
2015-08-07 01:59:25 +02:00
/**
* {@inheritdoc}
2015-08-07 01:59:25 +02:00
*/
2018-10-07 16:50:45 +02:00
public function send($data): Promise
{
2016-08-23 01:25:19 +02:00
if ($this->channel === null) {
2015-09-08 19:55:29 +02:00
throw new StatusError('The thread has not been started or has already finished.');
}
2017-01-17 06:24:59 +01:00
if ($data instanceof ExitResult) {
throw new \Error('Cannot send exit result objects.');
2015-07-27 00:53:00 +02:00
}
2015-07-14 00:30:59 +02:00
2019-08-28 01:14:56 +02:00
return call(function () use ($data): \Generator {
2017-05-28 07:09:13 +02:00
Loop::enable($this->watcher);
try {
2017-12-13 23:29:44 +01:00
$result = yield $this->channel->send($data);
2017-05-28 07:09:13 +02:00
} finally {
Loop::disable($this->watcher);
}
2017-12-13 23:29:44 +01:00
return $result;
});
}
/**
* Returns the ID of the thread. This ID will be unique to this process.
*
* @return int
*
* @throws \Amp\Process\StatusError
*/
public function getId(): int
{
if ($this->id === null) {
throw new StatusError('The thread has not been started');
}
return $this->id;
}
2015-07-14 00:30:59 +02:00
}