1
0
mirror of https://github.com/danog/parallel.git synced 2024-12-03 10:07:49 +01:00
parallel/lib/Threading/Thread.php

262 lines
6.8 KiB
PHP
Raw Normal View History

2015-07-14 00:30:59 +02:00
<?php
2016-08-18 18:04:48 +02:00
namespace Amp\Concurrent\Threading;
use Amp\Concurrent\{ContextException, StatusError, SynchronizationError, Strand};
use Amp\Concurrent\Sync\{ChannelledStream, Internal\ExitStatus};
use Amp\Coroutine;
use Amp\Socket\Socket;
use Interop\Async\Awaitable;
2015-07-27 00:53:00 +02:00
/**
* Implements an execution context using native multi-threading.
2015-08-11 00:38:58 +02:00
*
* The thread context is not itself threaded. A local instance of the context is
* maintained both in the context that creates the thread and in the thread
* itself.
*/
2016-08-18 18:04:48 +02:00
class Thread implements Strand {
2015-07-27 00:53:00 +02:00
/**
2015-08-31 00:52:00 +02:00
* @var Internal\Thread An internal thread instance.
2015-07-27 00:53:00 +02:00
*/
private $thread;
2015-07-27 00:53:00 +02:00
/**
2016-08-18 18:04:48 +02:00
* @var \Amp\Concurrent\Sync\Channel A channel for communicating with the thread.
*/
private $channel;
/**
2016-08-18 18:04:48 +02:00
* @var \Amp\Socket\Socket
*/
private $pipe;
/**
* @var resource
*/
private $socket;
/**
* @var callable
*/
private $function;
/**
* @var mixed[]
*/
private $args;
2015-09-08 19:55:29 +02:00
/**
2015-09-19 05:20:35 +02:00
* @var int
2015-09-08 19:55:29 +02:00
*/
2015-09-19 05:20:35 +02:00
private $oid = 0;
2015-09-08 19:55:29 +02:00
/**
* Checks if threading is enabled.
*
* @return bool True if threading is enabled, otherwise false.
*/
2016-08-18 18:04:48 +02:00
public static function supported(): bool {
return \extension_loaded('pthreads');
}
2015-08-07 01:59:25 +02:00
/**
* Spawns a new thread and runs it.
*
2015-08-31 00:52:00 +02:00
* @param callable $function The callable to invoke in the thread.
*
2015-08-31 00:52:00 +02:00
* @return Thread The thread object that was spawned.
2015-08-07 01:59:25 +02:00
*/
2016-08-18 18:04:48 +02:00
public static function spawn(callable $function, ...$args) {
2016-01-23 07:00:56 +01:00
$thread = new self($function, ...$args);
$thread->start();
return $thread;
}
2015-08-18 17:12:06 +02:00
/**
2015-08-31 00:52:00 +02:00
* Creates a new thread.
*
2015-08-31 00:52:00 +02:00
* @param callable $function The callable to invoke in the thread when run.
*
2016-08-18 18:04:48 +02:00
* @throws \Error Thrown if the pthreads extension is not available.
2015-08-18 17:12:06 +02:00
*/
2016-08-18 18:04:48 +02:00
public function __construct(callable $function, ...$args) {
if (!self::supported()) {
throw new \Error("The pthreads extension is required to create threads.");
}
$this->function = $function;
$this->args = $args;
}
/**
* Returns the thread to the condition before starting. The new thread can be started and run independently of the
* first thread.
*/
2016-08-18 18:04:48 +02:00
public function __clone() {
$this->thread = null;
$this->socket = null;
$this->pipe = null;
$this->channel = null;
2015-09-19 05:20:35 +02:00
$this->oid = 0;
}
/**
* Kills the thread if it is still running.
*
2016-08-18 18:04:48 +02:00
* @throws \Amp\Concurrent\ContextException
2015-09-19 05:20:35 +02:00
*/
2016-08-18 18:04:48 +02:00
public function __destruct() {
if (\getmypid() === $this->oid) {
2015-09-19 05:20:35 +02:00
$this->kill();
}
}
2015-08-18 17:12:06 +02:00
2015-07-27 00:53:00 +02:00
/**
* Checks if the context is running.
2015-07-27 00:53:00 +02:00
*
* @return bool True if the context is running, otherwise false.
2015-07-27 00:53:00 +02:00
*/
2016-08-18 18:04:48 +02:00
public function isRunning(): bool {
return null !== $this->pipe && $this->pipe->isReadable();
2015-08-07 01:59:25 +02:00
}
/**
2015-08-31 00:52:00 +02:00
* Spawns the thread and begins the thread's execution.
*
2016-08-18 18:04:48 +02:00
* @throws \Amp\Concurrent\StatusError If the thread has already been started.
* @throws \Amp\Concurrent\ContextException If starting the thread was unsuccessful.
*/
2016-08-18 18:04:48 +02:00
public function start() {
2015-09-19 05:20:35 +02:00
if (0 !== $this->oid) {
throw new StatusError('The thread has already been started.');
}
2016-08-18 18:04:48 +02:00
$this->oid = \getmypid();
2015-09-08 19:55:29 +02:00
2016-08-18 18:04:48 +02:00
list($channel, $this->socket) = \Amp\Socket\pair();
$this->thread = new Internal\Thread($this->socket, $this->function, $this->args);
2016-08-18 18:04:48 +02:00
if (!$this->thread->start(PTHREADS_INHERIT_INI)) {
throw new ContextException('Failed to start the thread.');
}
2016-08-18 18:04:48 +02:00
$this->channel = new ChannelledStream($this->pipe = new Socket($channel));
}
2015-08-11 00:38:58 +02:00
/**
* Immediately kills the context.
2015-08-31 00:52:00 +02:00
*
2016-08-18 18:04:48 +02:00
* @throws ContextException If killing the thread was unsuccessful.
*/
2016-08-18 18:04:48 +02:00
public function kill() {
2015-09-06 21:59:24 +02:00
if (null !== $this->thread) {
2015-09-08 19:55:29 +02:00
try {
if ($this->thread->isRunning() && !$this->thread->kill()) {
2016-08-18 18:04:48 +02:00
throw new ContextException('Could not kill thread.');
2015-09-08 19:55:29 +02:00
}
} finally {
$this->close();
2015-09-06 21:59:24 +02:00
}
}
}
/**
* Closes channel and socket if still open.
*/
2016-08-18 18:04:48 +02:00
private function close() {
if (null !== $this->pipe && $this->pipe->isReadable()) {
$this->pipe->close();
}
2016-08-18 18:04:48 +02:00
if (\is_resource($this->socket)) {
@\fclose($this->socket);
2015-08-28 23:18:02 +02:00
}
2015-09-08 19:55:29 +02:00
$this->thread = null;
$this->channel = null;
}
2015-08-18 17:12:06 +02:00
/**
* Gets a promise that resolves when the context ends and joins with the
* parent context.
2015-08-18 17:12:06 +02:00
*
2016-08-18 18:04:48 +02:00
* @return \Interop\Async\Awaitable<mixed>
*
* @throws StatusError Thrown if the context has not been started.
2015-08-31 00:52:00 +02:00
* @throws SynchronizationError Thrown if an exit status object is not received.
2015-08-18 17:12:06 +02:00
*/
2016-08-18 18:04:48 +02:00
public function join(): Awaitable {
2015-09-08 19:55:29 +02:00
if (null === $this->channel || null === $this->thread) {
throw new StatusError('The thread has not been started or has already finished.');
}
2016-08-18 18:04:48 +02:00
return new Coroutine($this->doJoin());
}
/**
* @coroutine
*
* @return \Generator
*
* @throws \Amp\Concurrent\SynchronizationError If the thread does not send an exit status.
*/
private function doJoin(): \Generator {
2015-08-18 17:12:06 +02:00
try {
2016-08-18 18:04:48 +02:00
$response = yield $this->channel->receive();
2015-12-05 06:50:32 +01:00
if (!$response instanceof ExitStatus) {
throw new SynchronizationError('Did not receive an exit status from thread.');
2015-08-18 17:12:06 +02:00
}
2016-01-23 07:00:56 +01:00
$result = $response->getResult();
$this->thread->join();
2016-01-23 07:00:56 +01:00
} catch (\Throwable $exception) {
$this->kill();
throw $exception;
2015-08-18 17:12:06 +02:00
}
$this->close();
2016-01-23 07:00:56 +01:00
return $result;
2015-08-18 17:12:06 +02:00
}
/**
* {@inheritdoc}
2015-08-18 17:12:06 +02:00
*/
2016-08-18 18:04:48 +02:00
public function receive(): Awaitable {
2015-09-08 19:55:29 +02:00
if (null === $this->channel) {
2016-08-18 18:04:48 +02:00
throw new StatusError('The process has not been started.');
}
2016-08-18 18:04:48 +02:00
return \Amp\pipe($this->channel->receive(), static function ($data) {
if ($data instanceof ExitStatus) {
$data = $data->getResult();
throw new SynchronizationError(\sprintf(
'Thread unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
));
}
return $data;
});
2015-08-18 17:12:06 +02:00
}
2015-08-07 01:59:25 +02:00
/**
* {@inheritdoc}
2015-08-07 01:59:25 +02:00
*/
2016-08-18 18:04:48 +02:00
public function send($data): Awaitable {
2015-09-08 19:55:29 +02:00
if (null === $this->channel) {
throw new StatusError('The thread has not been started or has already finished.');
}
2015-12-05 06:50:32 +01:00
if ($data instanceof ExitStatus) {
2016-08-18 18:04:48 +02:00
throw new \Error('Cannot send exit status objects.');
2015-07-27 00:53:00 +02:00
}
2015-07-14 00:30:59 +02:00
2016-08-18 18:04:48 +02:00
return $this->channel->send($data);
}
2015-07-14 00:30:59 +02:00
}