2016-12-29 19:16:04 -06:00
|
|
|
<?php
|
2015-07-13 17:30:59 -05:00
|
|
|
|
2017-11-29 15:01:32 -06:00
|
|
|
namespace Amp\Parallel\Context;
|
2016-08-18 11:04:48 -05:00
|
|
|
|
2018-10-06 10:05:49 -05:00
|
|
|
use Amp\Failure;
|
2017-05-18 09:51:31 +02:00
|
|
|
use Amp\Loop;
|
|
|
|
use Amp\Parallel\Sync\ChannelledSocket;
|
2017-07-28 17:34:24 -05:00
|
|
|
use Amp\Parallel\Sync\ExitResult;
|
2017-12-07 21:26:55 -06:00
|
|
|
use Amp\Parallel\Sync\SynchronizationError;
|
2017-05-18 09:51:31 +02:00
|
|
|
use Amp\Promise;
|
2018-10-06 10:05:49 -05:00
|
|
|
use Amp\Success;
|
2017-05-10 09:05:35 +02:00
|
|
|
use function Amp\call;
|
2015-07-26 17:53:00 -05:00
|
|
|
|
2015-07-15 12:36:32 -05:00
|
|
|
/**
|
2015-08-22 16:27:44 -05:00
|
|
|
* Implements an execution context using native multi-threading.
|
2015-08-10 17:38:58 -05:00
|
|
|
*
|
2015-08-22 16:27:44 -05:00
|
|
|
* The thread context is not itself threaded. A local instance of the context is
|
|
|
|
* maintained both in the context that creates the thread and in the thread
|
|
|
|
* itself.
|
2015-07-15 12:36:32 -05:00
|
|
|
*/
|
2018-10-21 10:54:46 -05:00
|
|
|
final class Thread implements Context
|
2018-10-07 09:50:45 -05:00
|
|
|
{
|
2017-03-09 16:55:11 -06:00
|
|
|
const EXIT_CHECK_FREQUENCY = 250;
|
|
|
|
|
2019-02-20 17:31:22 -06:00
|
|
|
/** @var int */
|
|
|
|
private static $nextId = 1;
|
|
|
|
|
2016-08-26 10:10:03 -05:00
|
|
|
/** @var Internal\Thread An internal thread instance. */
|
2015-08-22 16:27:44 -05:00
|
|
|
private $thread;
|
2015-07-26 17:53:00 -05:00
|
|
|
|
2016-08-30 18:27:14 -05:00
|
|
|
/** @var \Amp\Parallel\Sync\ChannelledSocket A channel for communicating with the thread. */
|
2015-08-22 16:27:44 -05:00
|
|
|
private $channel;
|
2015-08-06 23:25:04 -05:00
|
|
|
|
2016-08-26 10:10:03 -05:00
|
|
|
/** @var resource */
|
2015-08-24 19:35:42 -05:00
|
|
|
private $socket;
|
|
|
|
|
2016-08-26 10:10:03 -05:00
|
|
|
/** @var callable */
|
2015-09-04 16:22:41 -05:00
|
|
|
private $function;
|
|
|
|
|
2016-08-26 10:10:03 -05:00
|
|
|
/** @var mixed[] */
|
2015-09-04 16:22:41 -05:00
|
|
|
private $args;
|
2015-08-27 13:06:39 -05:00
|
|
|
|
2019-02-20 17:31:22 -06:00
|
|
|
/** @var int|null */
|
|
|
|
private $id;
|
|
|
|
|
2016-08-26 10:10:03 -05:00
|
|
|
/** @var int */
|
2015-09-18 22:20:35 -05:00
|
|
|
private $oid = 0;
|
2015-09-08 12:55:29 -05:00
|
|
|
|
2017-03-09 16:55:11 -06:00
|
|
|
/** @var string */
|
|
|
|
private $watcher;
|
|
|
|
|
2015-11-11 01:07:59 -06:00
|
|
|
/**
|
|
|
|
* Checks if threading is enabled.
|
|
|
|
*
|
|
|
|
* @return bool True if threading is enabled, otherwise false.
|
|
|
|
*/
|
2018-10-21 10:34:32 -05:00
|
|
|
public static function isSupported(): bool
|
2018-10-07 09:50:45 -05:00
|
|
|
{
|
2016-08-18 11:04:48 -05:00
|
|
|
return \extension_loaded('pthreads');
|
2015-11-11 01:07:59 -06:00
|
|
|
}
|
|
|
|
|
2015-08-06 18:59:25 -05:00
|
|
|
/**
|
2017-12-10 17:01:10 -06:00
|
|
|
* Creates and starts a new thread.
|
2015-08-22 16:27:44 -05:00
|
|
|
*
|
2017-12-10 16:43:19 -06:00
|
|
|
* @param callable $function The callable to invoke in the thread. First argument is an instance of
|
|
|
|
* \Amp\Parallel\Sync\Channel.
|
|
|
|
* @param mixed ...$args Additional arguments to pass to the given callable.
|
2015-08-22 16:27:44 -05:00
|
|
|
*
|
2018-10-06 10:05:49 -05:00
|
|
|
* @return Promise<Thread> The thread object that was spawned.
|
2015-08-06 18:59:25 -05:00
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public static function run(callable $function, ...$args): Promise
|
|
|
|
{
|
2016-01-23 00:00:56 -06:00
|
|
|
$thread = new self($function, ...$args);
|
2018-10-06 10:05:49 -05:00
|
|
|
return call(function () use ($thread) {
|
|
|
|
yield $thread->start();
|
|
|
|
return $thread;
|
|
|
|
});
|
2015-08-22 16:27:44 -05:00
|
|
|
}
|
2015-08-05 02:48:43 -05:00
|
|
|
|
2015-08-18 10:12:06 -05:00
|
|
|
/**
|
2015-08-30 17:52:00 -05:00
|
|
|
* Creates a new thread.
|
2015-08-22 16:27:44 -05:00
|
|
|
*
|
2017-12-10 16:43:19 -06:00
|
|
|
* @param callable $function The callable to invoke in the thread. First argument is an instance of
|
|
|
|
* \Amp\Parallel\Sync\Channel.
|
|
|
|
* @param mixed ...$args Additional arguments to pass to the given callable.
|
2015-08-30 17:52:00 -05:00
|
|
|
*
|
2016-08-18 11:04:48 -05:00
|
|
|
* @throws \Error Thrown if the pthreads extension is not available.
|
2015-08-18 10:12:06 -05:00
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public function __construct(callable $function, ...$args)
|
|
|
|
{
|
2018-10-21 10:34:32 -05:00
|
|
|
if (!self::isSupported()) {
|
2016-08-18 11:04:48 -05:00
|
|
|
throw new \Error("The pthreads extension is required to create threads.");
|
2015-11-11 01:07:59 -06:00
|
|
|
}
|
|
|
|
|
2015-09-04 16:22:41 -05:00
|
|
|
$this->function = $function;
|
|
|
|
$this->args = $args;
|
|
|
|
}
|
2015-08-22 16:27:44 -05:00
|
|
|
|
2015-09-04 16:22:41 -05:00
|
|
|
/**
|
|
|
|
* Returns the thread to the condition before starting. The new thread can be started and run independently of the
|
|
|
|
* first thread.
|
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public function __clone()
|
|
|
|
{
|
2015-09-04 16:22:41 -05:00
|
|
|
$this->thread = null;
|
|
|
|
$this->socket = null;
|
|
|
|
$this->channel = null;
|
2015-09-18 22:20:35 -05:00
|
|
|
$this->oid = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Kills the thread if it is still running.
|
|
|
|
*
|
2017-12-07 21:26:55 -06:00
|
|
|
* @throws \Amp\Parallel\Context\ContextException
|
2015-09-18 22:20:35 -05:00
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public function __destruct()
|
|
|
|
{
|
2016-08-18 11:04:48 -05:00
|
|
|
if (\getmypid() === $this->oid) {
|
2015-09-18 22:20:35 -05:00
|
|
|
$this->kill();
|
|
|
|
}
|
2015-08-22 16:27:44 -05:00
|
|
|
}
|
2015-08-18 10:12:06 -05:00
|
|
|
|
2015-07-26 17:53:00 -05:00
|
|
|
/**
|
2015-08-22 16:27:44 -05:00
|
|
|
* Checks if the context is running.
|
2015-07-26 17:53:00 -05:00
|
|
|
*
|
2015-08-22 16:27:44 -05:00
|
|
|
* @return bool True if the context is running, otherwise false.
|
2015-07-26 17:53:00 -05:00
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public function isRunning(): bool
|
|
|
|
{
|
2016-08-30 18:27:14 -05:00
|
|
|
return $this->channel !== null;
|
2015-08-06 18:59:25 -05:00
|
|
|
}
|
|
|
|
|
2015-08-05 02:48:43 -05:00
|
|
|
/**
|
2015-08-30 17:52:00 -05:00
|
|
|
* Spawns the thread and begins the thread's execution.
|
|
|
|
*
|
2019-02-20 17:31:22 -06:00
|
|
|
* @return Promise<int> Resolved once the thread has started.
|
2018-10-11 11:14:14 -05:00
|
|
|
*
|
2017-12-07 21:26:55 -06:00
|
|
|
* @throws \Amp\Parallel\Context\StatusError If the thread has already been started.
|
|
|
|
* @throws \Amp\Parallel\Context\ContextException If starting the thread was unsuccessful.
|
2015-08-05 02:48:43 -05:00
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public function start(): Promise
|
|
|
|
{
|
2016-08-22 18:25:19 -05:00
|
|
|
if ($this->oid !== 0) {
|
2015-08-27 13:06:39 -05:00
|
|
|
throw new StatusError('The thread has already been started.');
|
2015-08-05 02:48:43 -05:00
|
|
|
}
|
|
|
|
|
2016-08-18 11:04:48 -05:00
|
|
|
$this->oid = \getmypid();
|
2015-09-08 12:55:29 -05:00
|
|
|
|
2016-08-30 18:27:14 -05:00
|
|
|
$sockets = @\stream_socket_pair(
|
2016-09-01 18:10:52 -05:00
|
|
|
\stripos(\PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX,
|
2016-08-30 18:27:14 -05:00
|
|
|
STREAM_SOCK_STREAM,
|
|
|
|
STREAM_IPPROTO_IP
|
|
|
|
);
|
|
|
|
|
|
|
|
if ($sockets === false) {
|
|
|
|
$message = "Failed to create socket pair";
|
|
|
|
if ($error = \error_get_last()) {
|
|
|
|
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
|
|
|
|
}
|
2018-10-06 10:05:49 -05:00
|
|
|
return new Failure(new ContextException($message));
|
2016-08-30 18:27:14 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
list($channel, $this->socket) = $sockets;
|
2015-09-04 16:22:41 -05:00
|
|
|
|
2019-02-20 17:31:22 -06:00
|
|
|
$this->id = self::$nextId++;
|
|
|
|
|
|
|
|
$thread = $this->thread = new Internal\Thread($this->id, $this->socket, $this->function, $this->args);
|
2015-09-04 16:22:41 -05:00
|
|
|
|
2017-11-25 08:59:07 -06:00
|
|
|
if (!$this->thread->start(\PTHREADS_INHERIT_INI)) {
|
2018-10-06 10:05:49 -05:00
|
|
|
return new Failure(new ContextException('Failed to start the thread.'));
|
2015-08-30 18:25:44 -05:00
|
|
|
}
|
2015-08-27 13:06:39 -05:00
|
|
|
|
2017-12-27 12:36:28 -06:00
|
|
|
$channel = $this->channel = new ChannelledSocket($channel, $channel);
|
2017-03-09 16:55:11 -06:00
|
|
|
|
2017-12-27 12:36:28 -06:00
|
|
|
$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function ($watcher) use ($thread, $channel) {
|
|
|
|
if (!$thread->isRunning()) {
|
2017-12-26 23:16:44 -06:00
|
|
|
// Delay closing to avoid race condition between thread exiting and data becoming available.
|
2017-12-27 12:36:28 -06:00
|
|
|
Loop::delay(self::EXIT_CHECK_FREQUENCY, [$channel, "close"]);
|
2017-12-26 23:16:44 -06:00
|
|
|
Loop::cancel($watcher);
|
2017-03-09 16:55:11 -06:00
|
|
|
}
|
|
|
|
});
|
2017-05-27 00:43:09 -05:00
|
|
|
|
|
|
|
Loop::disable($this->watcher);
|
2018-10-06 10:05:49 -05:00
|
|
|
|
2019-02-20 17:31:22 -06:00
|
|
|
return new Success($this->id);
|
2015-08-22 16:27:44 -05:00
|
|
|
}
|
2015-08-10 17:38:58 -05:00
|
|
|
|
2015-08-22 16:27:44 -05:00
|
|
|
/**
|
|
|
|
* Immediately kills the context.
|
2015-08-30 17:52:00 -05:00
|
|
|
*
|
2016-08-18 11:04:48 -05:00
|
|
|
* @throws ContextException If killing the thread was unsuccessful.
|
2015-08-22 16:27:44 -05:00
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public function kill()
|
|
|
|
{
|
2016-08-22 18:25:19 -05:00
|
|
|
if ($this->thread !== null) {
|
2015-09-08 12:55:29 -05:00
|
|
|
try {
|
|
|
|
if ($this->thread->isRunning() && !$this->thread->kill()) {
|
2016-08-18 11:04:48 -05:00
|
|
|
throw new ContextException('Could not kill thread.');
|
2015-09-08 12:55:29 -05:00
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
$this->close();
|
2015-09-06 14:59:24 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-09-02 17:24:01 -05:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Closes channel and socket if still open.
|
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
private function close()
|
|
|
|
{
|
2016-08-30 18:27:14 -05:00
|
|
|
if ($this->channel !== null) {
|
|
|
|
$this->channel->close();
|
2015-09-02 17:24:01 -05:00
|
|
|
}
|
|
|
|
|
2015-10-18 01:54:09 -05:00
|
|
|
$this->channel = null;
|
2017-03-09 16:55:11 -06:00
|
|
|
Loop::cancel($this->watcher);
|
2015-08-05 02:48:43 -05:00
|
|
|
}
|
|
|
|
|
2015-08-18 10:12:06 -05:00
|
|
|
/**
|
2015-08-22 16:27:44 -05:00
|
|
|
* Gets a promise that resolves when the context ends and joins with the
|
|
|
|
* parent context.
|
2015-08-18 10:12:06 -05:00
|
|
|
*
|
2017-03-16 17:03:59 -05:00
|
|
|
* @return \Amp\Promise<mixed>
|
2015-08-24 19:35:42 -05:00
|
|
|
*
|
2015-09-02 17:24:01 -05:00
|
|
|
* @throws StatusError Thrown if the context has not been started.
|
2015-08-30 17:52:00 -05:00
|
|
|
* @throws SynchronizationError Thrown if an exit status object is not received.
|
2017-05-27 00:43:09 -05:00
|
|
|
* @throws ContextException If the context stops responding.
|
2015-08-18 10:12:06 -05:00
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public function join(): Promise
|
|
|
|
{
|
2016-08-22 18:25:19 -05:00
|
|
|
if ($this->channel == null || $this->thread === null) {
|
2015-09-08 12:55:29 -05:00
|
|
|
throw new StatusError('The thread has not been started or has already finished.');
|
2015-08-27 13:06:39 -05:00
|
|
|
}
|
2017-05-10 09:05:35 +02:00
|
|
|
|
2017-12-07 20:49:44 -06:00
|
|
|
return call(function () {
|
|
|
|
Loop::enable($this->watcher);
|
2017-05-27 00:43:09 -05:00
|
|
|
|
2017-12-07 20:49:44 -06:00
|
|
|
try {
|
|
|
|
$response = yield $this->channel->receive();
|
|
|
|
} catch (\Throwable $exception) {
|
|
|
|
$this->kill();
|
2019-02-17 23:50:41 -06:00
|
|
|
throw new ContextException("Failed to receive result from thread", 0, $exception);
|
2017-12-07 20:49:44 -06:00
|
|
|
} finally {
|
|
|
|
Loop::disable($this->watcher);
|
|
|
|
$this->close();
|
2015-08-18 10:12:06 -05:00
|
|
|
}
|
2015-09-02 17:24:01 -05:00
|
|
|
|
2019-02-18 09:38:42 -06:00
|
|
|
if (!$response instanceof ExitResult) {
|
|
|
|
$this->kill();
|
|
|
|
throw new SynchronizationError('Did not receive an exit result from thread.');
|
|
|
|
}
|
|
|
|
|
2017-12-07 20:49:44 -06:00
|
|
|
return $response->getResult();
|
|
|
|
});
|
2015-08-18 10:12:06 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2015-08-22 16:27:44 -05:00
|
|
|
* {@inheritdoc}
|
2015-08-18 10:12:06 -05:00
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public function receive(): Promise
|
|
|
|
{
|
2016-08-22 18:25:19 -05:00
|
|
|
if ($this->channel === null) {
|
2016-08-18 11:04:48 -05:00
|
|
|
throw new StatusError('The process has not been started.');
|
2015-08-22 16:27:44 -05:00
|
|
|
}
|
2017-05-10 09:05:35 +02:00
|
|
|
|
2017-12-07 20:49:44 -06:00
|
|
|
return call(function () {
|
|
|
|
Loop::enable($this->watcher);
|
2017-05-27 00:43:09 -05:00
|
|
|
|
2017-12-07 20:49:44 -06:00
|
|
|
try {
|
|
|
|
$data = yield $this->channel->receive();
|
|
|
|
} finally {
|
|
|
|
Loop::disable($this->watcher);
|
|
|
|
}
|
2017-03-09 16:15:30 -06:00
|
|
|
|
2017-12-07 20:49:44 -06:00
|
|
|
if ($data instanceof ExitResult) {
|
|
|
|
$data = $data->getResult();
|
|
|
|
throw new SynchronizationError(\sprintf(
|
|
|
|
'Thread process unexpectedly exited with result of type: %s',
|
|
|
|
\is_object($data) ? \get_class($data) : \gettype($data)
|
|
|
|
));
|
|
|
|
}
|
2017-03-09 16:48:34 -06:00
|
|
|
|
2017-12-07 20:49:44 -06:00
|
|
|
return $data;
|
|
|
|
});
|
2015-08-18 10:12:06 -05:00
|
|
|
}
|
|
|
|
|
2015-08-06 18:59:25 -05:00
|
|
|
/**
|
2015-08-22 16:27:44 -05:00
|
|
|
* {@inheritdoc}
|
2015-08-06 18:59:25 -05:00
|
|
|
*/
|
2018-10-07 09:50:45 -05:00
|
|
|
public function send($data): Promise
|
|
|
|
{
|
2016-08-22 18:25:19 -05:00
|
|
|
if ($this->channel === null) {
|
2015-09-08 12:55:29 -05:00
|
|
|
throw new StatusError('The thread has not been started or has already finished.');
|
2015-08-27 13:06:39 -05:00
|
|
|
}
|
|
|
|
|
2017-01-16 23:24:59 -06:00
|
|
|
if ($data instanceof ExitResult) {
|
|
|
|
throw new \Error('Cannot send exit result objects.');
|
2015-07-26 17:53:00 -05:00
|
|
|
}
|
2015-07-13 17:30:59 -05:00
|
|
|
|
2017-05-10 09:05:35 +02:00
|
|
|
return call(function () use ($data) {
|
2017-05-28 00:09:13 -05:00
|
|
|
Loop::enable($this->watcher);
|
|
|
|
|
2017-05-10 09:05:35 +02:00
|
|
|
try {
|
2017-12-13 16:29:44 -06:00
|
|
|
$result = yield $this->channel->send($data);
|
2017-05-28 00:09:13 -05:00
|
|
|
} finally {
|
|
|
|
Loop::disable($this->watcher);
|
2017-05-10 09:05:35 +02:00
|
|
|
}
|
2017-12-13 16:29:44 -06:00
|
|
|
|
|
|
|
return $result;
|
2017-03-09 16:15:30 -06:00
|
|
|
});
|
2015-08-22 16:27:44 -05:00
|
|
|
}
|
2019-02-20 17:31:22 -06:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Returns the ID of the thread. This ID will be unique to this process.
|
|
|
|
*
|
|
|
|
* @return int
|
|
|
|
*
|
|
|
|
* @throws \Amp\Process\StatusError
|
|
|
|
*/
|
|
|
|
public function getId(): int
|
|
|
|
{
|
|
|
|
if ($this->id === null) {
|
|
|
|
throw new StatusError('The thread has not been started');
|
|
|
|
}
|
|
|
|
|
|
|
|
return $this->id;
|
|
|
|
}
|
2015-07-13 17:30:59 -05:00
|
|
|
}
|