diff --git a/.gitignore b/.gitignore index c64b8c8..5b9cdb5 100755 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.php_cs.cache .vagrant build composer.lock diff --git a/.php_cs.dist b/.php_cs.dist new file mode 100644 index 0000000..29b5fb6 --- /dev/null +++ b/.php_cs.dist @@ -0,0 +1,40 @@ +setRiskyAllowed(true) + ->setRules([ + "@PSR1" => true, + "@PSR2" => true, + "braces" => [ + "allow_single_line_closure" => true, + "position_after_functions_and_oop_constructs" => "same", + ], + "array_syntax" => ["syntax" => "short"], + "cast_spaces" => true, + "combine_consecutive_unsets" => true, + "function_to_constant" => true, + "no_multiline_whitespace_before_semicolons" => true, + "no_unused_imports" => true, + "no_useless_else" => true, + "no_useless_return" => true, + "no_whitespace_before_comma_in_array" => true, + "no_whitespace_in_blank_line" => true, + "non_printable_character" => true, + "normalize_index_brace" => true, + "ordered_imports" => true, + "php_unit_construct" => true, + "php_unit_dedicate_assert" => true, + "php_unit_fqcn_annotation" => true, + "phpdoc_summary" => true, + "phpdoc_types" => true, + "psr4" => true, + "return_type_declaration" => ["space_before" => "none"], + "short_scalar_cast" => true, + "single_blank_line_before_namespace" => true, + ]) + ->setFinder( + PhpCsFixer\Finder::create() + ->in(__DIR__ . "/example") + ->in(__DIR__ . "/lib") + ->in(__DIR__ . "/test") + ); diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6c3291b --- /dev/null +++ b/Makefile @@ -0,0 +1,45 @@ +PHP_BIN := php +COMPOSER_BIN := composer + +COVERAGE = coverage +SRCS = lib test + +find_php_files = $(shell find $(1) -type f -name "*.php") +src = $(foreach d,$(SRCS),$(call find_php_files,$(d))) + +.PHONY: test +test: setup phpunit code-style + +.PHONY: clean +clean: clean-coverage clean-vendor + +.PHONY: clean-coverage +clean-coverage: + test ! -e coverage || rm -r coverage + +.PHONY: clean-vendor +clean-vendor: + test ! -e vendor || rm -r vendor + +.PHONY: setup +setup: vendor/autoload.php + +.PHONY: deps-update +deps-update: + $(COMPOSER_BIN) update + +.PHONY: phpunit +phpunit: setup + $(PHP_BIN) vendor/bin/phpunit + +.PHONY: code-style +code-style: setup + PHP_CS_FIXER_IGNORE_ENV=1 $(PHP_BIN) vendor/bin/php-cs-fixer --diff -v fix + +composer.lock: composer.json + $(COMPOSER_BIN) install + touch $@ + +vendor/autoload.php: composer.lock + $(COMPOSER_BIN) install + touch $@ diff --git a/composer.json b/composer.json index 7359145..3732346 100755 --- a/composer.json +++ b/composer.json @@ -21,13 +21,14 @@ } ], "require": { - "amphp/amp": "^2.0", + "amphp/amp": "^2", "amphp/byte-stream": "dev-master as 0.1", "amphp/process": "dev-amp_v2 as 0.2" }, "require-dev": { "amphp/phpunit-util": "dev-master", - "phpunit/phpunit": "^6.0" + "friendsofphp/php-cs-fixer": "^2.3", + "phpunit/phpunit": "^6" }, "minimum-stability": "dev", "suggest": { diff --git a/example/BlockingTask.php b/example/BlockingTask.php index d8bbdd7..840c695 100644 --- a/example/BlockingTask.php +++ b/example/BlockingTask.php @@ -1,7 +1,8 @@ receive()); - + print "Sleeping for 3 seconds...\n"; sleep(3); // Blocking call in thread. - + yield $this->send("Data sent from child."); - + print "Sleeping for 2 seconds...\n"; sleep(2); // Blocking call in thread. - + return 42; }); - + print "Waiting 2 seconds to send start data...\n"; yield new Pause(2000); - + yield $context->send("Start data"); - + printf("Received the following from child: %s\n", yield $context->receive()); printf("Thread ended with value %d!\n", yield $context->join()); } finally { diff --git a/example/worker-pool.php b/example/worker-pool.php index d8ecb20..892a87e 100755 --- a/example/worker-pool.php +++ b/example/worker-pool.php @@ -2,15 +2,17 @@ start(); @@ -33,7 +35,7 @@ Loop::run(function() { $result = yield $pool->enqueue(new BlockingTask('file_get_contents', $url)); printf("Read from %s: %d bytes\n", $url, strlen($result)); }; - + $coroutines = array_map(function (callable $coroutine): Coroutine { return new Coroutine($coroutine()); }, $coroutines); @@ -42,4 +44,3 @@ Loop::run(function() { return yield $pool->shutdown(); }); - diff --git a/example/worker.php b/example/worker.php index e9abaac..0180dee 100755 --- a/example/worker.php +++ b/example/worker.php @@ -2,8 +2,8 @@ name = $name; $this->trace = $trace; } - + /** * Returns the class name of the uncaught exception. * diff --git a/lib/Process/ChannelledProcess.php b/lib/Process/ChannelledProcess.php index afa4516..194306a 100644 --- a/lib/Process/ChannelledProcess.php +++ b/lib/Process/ChannelledProcess.php @@ -2,17 +2,18 @@ namespace Amp\Parallel\Process; -use function Amp\call; -use Amp\{ Coroutine, Promise }; -use Amp\Parallel\{ - ContextException, - Process as ProcessContext, - StatusError, - Strand, - SynchronizationError -}; -use Amp\Parallel\Sync\{ ChannelException, ChannelledSocket, Internal\ExitResult }; +use Amp\Coroutine; +use Amp\Parallel\ContextException; +use Amp\Parallel\Process as ProcessContext; +use Amp\Parallel\StatusError; +use Amp\Parallel\Strand; +use Amp\Parallel\Sync\ChannelException; +use Amp\Parallel\Sync\ChannelledSocket; +use Amp\Parallel\Sync\Internal\ExitResult; +use Amp\Parallel\SynchronizationError; use Amp\Process\Process; +use Amp\Promise; +use function Amp\call; class ChannelledProcess implements ProcessContext, Strand { /** @var \Amp\Process\Process */ diff --git a/lib/StatusError.php b/lib/StatusError.php index ddd079b..bf8f192 100644 --- a/lib/StatusError.php +++ b/lib/StatusError.php @@ -2,4 +2,5 @@ namespace Amp\Parallel; -class StatusError extends \Error {} +class StatusError extends \Error { +} diff --git a/lib/Strand.php b/lib/Strand.php index 964763d..efd6b6b 100644 --- a/lib/Strand.php +++ b/lib/Strand.php @@ -2,4 +2,5 @@ namespace Amp\Parallel; -interface Strand extends Context, Sync\Channel {} +interface Strand extends Context, Sync\Channel { +} diff --git a/lib/Sync/ChannelledSocket.php b/lib/Sync/ChannelledSocket.php index 68abd8d..1f25f4d 100644 --- a/lib/Sync/ChannelledSocket.php +++ b/lib/Sync/ChannelledSocket.php @@ -2,35 +2,39 @@ namespace Amp\Parallel\Sync; -use Amp\{ Deferred, Failure, Loop, Promise, Success }; +use Amp\Deferred; +use Amp\Failure; +use Amp\Loop; +use Amp\Promise; +use Amp\Success; class ChannelledSocket implements Channel { const HEADER_LENGTH = 5; - + /** @var resource Stream resource. */ private $readResource; - + /** @var resource Stream resource. */ private $writeResource; - + /** @var string onReadable loop watcher. */ private $readWatcher; - + /** @var string onWritable loop watcher. */ private $writeWatcher; - + /** @var \SplQueue Queue of pending reads. */ private $reads; - + /** @var \SplQueue Queue of pending writes. */ private $writes; - + /** @var bool */ private $open = true; - + /** @var bool */ private $autoClose = true; - + /** * @param resource $read Readable stream resource. * @param resource $write Writable stream resource. @@ -42,28 +46,28 @@ class ChannelledSocket implements Channel { if (!\is_resource($read) || \get_resource_type($read) !== 'stream') { throw new \Error('Invalid resource given to constructor!'); } - + if (!\is_resource($write) || \get_resource_type($write) !== 'stream') { throw new \Error('Invalid resource given to constructor!'); } - + $this->readResource = $read; $this->writeResource = $write; $this->autoClose = $autoClose; - + \stream_set_blocking($this->readResource, false); \stream_set_read_buffer($this->readResource, 0); \stream_set_write_buffer($this->readResource, 0); - + if ($this->readResource !== $this->writeResource) { \stream_set_blocking($this->writeResource, false); \stream_set_read_buffer($this->writeResource, 0); \stream_set_write_buffer($this->writeResource, 0); } - + $this->reads = $reads = new \SplQueue; $this->writes = $writes = new \SplQueue; - + $errorHandler = static function ($errno, $errstr, $errfile, $errline) { if ($errno & \error_reporting()) { throw new ChannelException(\sprintf( @@ -75,56 +79,56 @@ class ChannelledSocket implements Channel { )); } }; - + $this->readWatcher = Loop::onReadable($this->readResource, static function ($watcher, $stream) use ($reads, $errorHandler) { while (!$reads->isEmpty()) { /** @var \Amp\Deferred $deferred */ list($buffer, $length, $deferred) = $reads->shift(); - + if ($length === 0) { // Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes. $data = @\fread($stream, self::HEADER_LENGTH - \strlen($buffer)); - + if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) { $deferred->fail(new ChannelException("The socket unexpectedly closed")); break; } - + $buffer .= $data; - + if (\strlen($buffer) !== self::HEADER_LENGTH) { // Not enough data available. $reads->unshift([$buffer, 0, $deferred]); return; } - + $data = \unpack("Cprefix/Llength", $data); - + if ($data["prefix"] !== 0) { $deferred->fail(new ChannelException("Invalid header received")); break; } - + $length = $data["length"]; $buffer = ''; } - + // Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes. $data = @\fread($stream, $length - \strlen($buffer)); - + if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) { $deferred->fail(new ChannelException("The socket unexpectedly closed")); break; } - + $buffer .= $data; - + if (\strlen($buffer) < $length) { // Not enough data available. $reads->unshift([$buffer, $length, $deferred]); return; } - + \set_error_handler($errorHandler); try { @@ -140,10 +144,10 @@ class ChannelledSocket implements Channel { $deferred->fail(new SerializationException("Exception thrown when unserializing data", $exception)); } } - + Loop::disable($watcher); }); - + $this->writeWatcher = Loop::onWritable($this->writeResource, static function ($watcher, $stream) use ($writes) { try { while (!$writes->isEmpty()) { @@ -167,7 +171,7 @@ class ChannelledSocket implements Channel { $exception = new ChannelException($message); $deferred->fail($exception); while (!$writes->isEmpty()) { - list( , , $deferred) = $writes->shift(); + list(, , $deferred) = $writes->shift(); $deferred->fail($exception); } return; @@ -188,17 +192,17 @@ class ChannelledSocket implements Channel { } } }); - + Loop::disable($this->readWatcher); Loop::disable($this->writeWatcher); } - + public function __destruct() { if ($this->readResource !== null) { $this->close(); } } - + /** * {@inheritdoc} */ @@ -206,7 +210,7 @@ class ChannelledSocket implements Channel { if (\is_resource($this->readResource)) { if ($this->autoClose) { @\fclose($this->readResource); - + if ($this->readResource !== $this->writeResource) { @\fclose($this->writeResource); } @@ -214,23 +218,23 @@ class ChannelledSocket implements Channel { $this->readResource = null; $this->writeResource = null; } - + $this->open = false; - + if (!$this->reads->isEmpty()) { $exception = new ChannelException("The connection was unexpectedly closed before reading completed"); do { /** @var \Amp\Deferred $deferred */ - list( , , $deferred) = $this->reads->shift(); + list(, , $deferred) = $this->reads->shift(); $deferred->fail($exception); } while (!$this->reads->isEmpty()); } - + if (!$this->writes->isEmpty()) { $exception = new ChannelException("The connection was unexpectedly writing completed"); do { /** @var \Amp\Deferred $deferred */ - list( , , $deferred) = $this->writes->shift(); + list(, , $deferred) = $this->writes->shift(); $deferred->fail($exception); } while (!$this->writes->isEmpty()); } @@ -238,7 +242,7 @@ class ChannelledSocket implements Channel { Loop::cancel($this->readWatcher); Loop::cancel($this->writeWatcher); } - + /** * {@inheritdoc} */ @@ -246,13 +250,13 @@ class ChannelledSocket implements Channel { if (!$this->open) { return new Failure(new ChannelException("The channel is has been closed")); } - + $deferred = new Deferred; $this->reads->push(["", 0, $deferred]); Loop::enable($this->readWatcher); return $deferred->promise(); } - + /** * @param string $data * @param bool $end @@ -272,7 +276,7 @@ class ChannelledSocket implements Channel { "The given data cannot be sent because it is not serializable.", $exception ); } - + $data = \pack("CL", 0, \strlen($data)) . $data; $length = \strlen($data); $written = 0; @@ -280,7 +284,7 @@ class ChannelledSocket implements Channel { if ($this->writes->isEmpty()) { // Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full. $written = @\fwrite($this->writeResource, $data); - + if ($written === false) { $message = "Failed to write to stream"; if ($error = \error_get_last()) { @@ -288,11 +292,11 @@ class ChannelledSocket implements Channel { } return new Failure(new ChannelException($message)); } - + if ($length <= $written) { return new Success($written); } - + $data = \substr($data, $written); } @@ -301,4 +305,4 @@ class ChannelledSocket implements Channel { Loop::enable($this->writeWatcher); return $deferred->promise(); } -} \ No newline at end of file +} diff --git a/lib/Sync/ChannelledStream.php b/lib/Sync/ChannelledStream.php index 0ed456c..8efbed4 100644 --- a/lib/Sync/ChannelledStream.php +++ b/lib/Sync/ChannelledStream.php @@ -2,8 +2,12 @@ namespace Amp\Parallel\Sync; -use Amp\{ Coroutine, Promise }; -use Amp\ByteStream\{ InputStream, OutputStream, Parser, StreamException }; +use Amp\ByteStream\InputStream; +use Amp\ByteStream\OutputStream; +use Amp\ByteStream\Parser; +use Amp\ByteStream\StreamException; +use Amp\Coroutine; +use Amp\Promise; /** * An asynchronous channel for sending data between threads and processes. diff --git a/lib/Sync/FileMutex.php b/lib/Sync/FileMutex.php index 9a9bb96..ebc9129 100644 --- a/lib/Sync/FileMutex.php +++ b/lib/Sync/FileMutex.php @@ -4,7 +4,6 @@ namespace Amp\Parallel\Sync; use Amp\Coroutine; use Amp\Delayed; -use Amp\Pause; use Amp\Parallel\MutexException; use Amp\Promise; diff --git a/lib/Sync/Internal/ExitFailure.php b/lib/Sync/Internal/ExitFailure.php index 1854c9d..1de73aa 100644 --- a/lib/Sync/Internal/ExitFailure.php +++ b/lib/Sync/Internal/ExitFailure.php @@ -39,4 +39,4 @@ class ExitFailure implements ExitResult { $this->trace ); } -} \ No newline at end of file +} diff --git a/lib/Sync/Internal/ExitSuccess.php b/lib/Sync/Internal/ExitSuccess.php index 1f53d67..bd8ccb6 100644 --- a/lib/Sync/Internal/ExitSuccess.php +++ b/lib/Sync/Internal/ExitSuccess.php @@ -16,4 +16,4 @@ class ExitSuccess implements ExitResult { public function getResult() { return $this->result; } -} \ No newline at end of file +} diff --git a/lib/Sync/LockAlreadyReleasedError.php b/lib/Sync/LockAlreadyReleasedError.php index 8e20c77..daad5db 100644 --- a/lib/Sync/LockAlreadyReleasedError.php +++ b/lib/Sync/LockAlreadyReleasedError.php @@ -2,4 +2,5 @@ namespace Amp\Parallel\Sync; -class LockAlreadyReleasedError extends \Error {} +class LockAlreadyReleasedError extends \Error { +} diff --git a/lib/Sync/Mutex.php b/lib/Sync/Mutex.php index 03273c7..4ced4a6 100644 --- a/lib/Sync/Mutex.php +++ b/lib/Sync/Mutex.php @@ -11,8 +11,7 @@ use Amp\Promise; * are atomic. Implementations do not have to guarantee that acquiring a lock * is first-come, first serve. */ -interface Mutex -{ +interface Mutex { /** * @coroutine * diff --git a/lib/Sync/MutexException.php b/lib/Sync/MutexException.php index dd4559f..86b863b 100644 --- a/lib/Sync/MutexException.php +++ b/lib/Sync/MutexException.php @@ -2,4 +2,5 @@ namespace Amp\Parallel\Sync; -class MutexException extends \Exception {} +class MutexException extends \Exception { +} diff --git a/lib/Sync/PosixSemaphore.php b/lib/Sync/PosixSemaphore.php index 272f21a..553ef50 100644 --- a/lib/Sync/PosixSemaphore.php +++ b/lib/Sync/PosixSemaphore.php @@ -123,7 +123,7 @@ class PosixSemaphore implements Semaphore, \Serializable { public function acquire(): Promise { return new Coroutine($this->doAcquire()); } - + /** * {@inheritdoc} */ diff --git a/lib/Sync/SemaphoreException.php b/lib/Sync/SemaphoreException.php index 767610d..231bb6a 100644 --- a/lib/Sync/SemaphoreException.php +++ b/lib/Sync/SemaphoreException.php @@ -2,4 +2,5 @@ namespace Amp\Parallel\Sync; -class SemaphoreException extends \Exception {} +class SemaphoreException extends \Exception { +} diff --git a/lib/Sync/SerializationException.php b/lib/Sync/SerializationException.php index e5c341f..d88fc55 100644 --- a/lib/Sync/SerializationException.php +++ b/lib/Sync/SerializationException.php @@ -2,4 +2,5 @@ namespace Amp\Parallel\Sync; -class SerializationException extends ChannelException {} +class SerializationException extends ChannelException { +} diff --git a/lib/Sync/SharedMemoryException.php b/lib/Sync/SharedMemoryException.php index 22179ac..aa2a98c 100644 --- a/lib/Sync/SharedMemoryException.php +++ b/lib/Sync/SharedMemoryException.php @@ -2,4 +2,5 @@ namespace Amp\Parallel\Sync; -class SharedMemoryException extends \Exception {} +class SharedMemoryException extends \Exception { +} diff --git a/lib/Sync/SharedMemoryParcel.php b/lib/Sync/SharedMemoryParcel.php index b265064..c288194 100644 --- a/lib/Sync/SharedMemoryParcel.php +++ b/lib/Sync/SharedMemoryParcel.php @@ -2,7 +2,8 @@ namespace Amp\Parallel\Sync; -use Amp\{ Coroutine, Promise }; +use Amp\Coroutine; +use Amp\Promise; /** * A container object for sharing a value across contexts. @@ -164,7 +165,7 @@ class SharedMemoryParcel implements Parcel, \Serializable { public function synchronized(callable $callback): Promise { return new Coroutine($this->doSynchronized($callback)); } - + /** * @coroutine * @@ -175,28 +176,28 @@ class SharedMemoryParcel implements Parcel, \Serializable { private function doSynchronized(callable $callback): \Generator { /** @var \Amp\Parallel\Sync\Lock $lock */ $lock = yield $this->semaphore->acquire(); - + try { $value = $this->unwrap(); $result = $callback($value); - + if ($result instanceof \Generator) { $result = new Coroutine($result); } - + if ($result instanceof Promise) { $result = yield $result; } - + $this->wrap(null === $result ? $value : $result); } finally { $lock->release(); } - + return $result; } - - + + /** * Frees the shared object from memory. * diff --git a/lib/SynchronizationError.php b/lib/SynchronizationError.php index c21bf3a..360dbb5 100644 --- a/lib/SynchronizationError.php +++ b/lib/SynchronizationError.php @@ -2,4 +2,5 @@ namespace Amp\Parallel; -class SynchronizationError extends \Error {} +class SynchronizationError extends \Error { +} diff --git a/lib/Threading/Internal/Mutex.php b/lib/Threading/Internal/Mutex.php index e2fe99f..9074540 100644 --- a/lib/Threading/Internal/Mutex.php +++ b/lib/Threading/Internal/Mutex.php @@ -15,14 +15,14 @@ class Mutex extends \Threaded { /** @var bool */ private $lock = true; - + /** * @return \Amp\Promise */ public function acquire(): Promise { return new Coroutine($this->doAcquire()); } - + /** * Attempts to acquire the lock and sleeps for a time if the lock could not be acquired. * diff --git a/lib/Threading/Internal/Semaphore.php b/lib/Threading/Internal/Semaphore.php index 14e8741..b116f3d 100644 --- a/lib/Threading/Internal/Semaphore.php +++ b/lib/Threading/Internal/Semaphore.php @@ -42,7 +42,7 @@ class Semaphore extends \Threaded { public function acquire(): Promise { return new Coroutine($this->doAcquire()); } - + /** * Uses a double locking mechanism to acquire a lock without blocking. A * synchronous mutex is used to make sure that the semaphore is queried one diff --git a/lib/Threading/Internal/Thread.php b/lib/Threading/Internal/Thread.php index 8e7e839..7c892bb 100644 --- a/lib/Threading/Internal/Thread.php +++ b/lib/Threading/Internal/Thread.php @@ -2,15 +2,15 @@ namespace Amp\Parallel\Threading\Internal; -use Amp\{ Coroutine, Loop, Promise }; -use Amp\Parallel\Sync\{ - Channel, - ChannelException, - ChannelledSocket, - Internal\ExitFailure, - Internal\ExitSuccess, - SerializationException -}; +use Amp\Coroutine; +use Amp\Loop; +use Amp\Parallel\Sync\Channel; +use Amp\Parallel\Sync\ChannelException; +use Amp\Parallel\Sync\ChannelledSocket; +use Amp\Parallel\Sync\Internal\ExitFailure; +use Amp\Parallel\Sync\Internal\ExitSuccess; +use Amp\Parallel\Sync\SerializationException; +use Amp\Promise; /** * An internal thread that executes a given function concurrently. diff --git a/lib/Threading/Mutex.php b/lib/Threading/Mutex.php index ca4b163..9db9dc7 100644 --- a/lib/Threading/Mutex.php +++ b/lib/Threading/Mutex.php @@ -2,8 +2,8 @@ namespace Amp\Parallel\Threading; -use Amp\Promise; use Amp\Parallel\Sync\Mutex as SyncMutex; +use Amp\Promise; /** * A thread-safe, asynchronous mutex using the pthreads locking mechanism. diff --git a/lib/Threading/Parcel.php b/lib/Threading/Parcel.php index bfe5333..3138c49 100644 --- a/lib/Threading/Parcel.php +++ b/lib/Threading/Parcel.php @@ -2,8 +2,9 @@ namespace Amp\Parallel\Threading; -use Amp\{ Coroutine, Promise }; +use Amp\Coroutine; use Amp\Parallel\Sync\Parcel as SyncParcel; +use Amp\Promise; /** * A thread-safe container that shares a value between multiple threads. @@ -52,7 +53,7 @@ class Parcel implements SyncParcel { public function synchronized(callable $callback): Promise { return new Coroutine($this->doSynchronized($callback)); } - + /** * @coroutine * @@ -70,15 +71,15 @@ class Parcel implements SyncParcel { try { $value = $this->unwrap(); $result = $callback($value); - + if ($result instanceof \Generator) { $result = new Coroutine($result); } - + if ($result instanceof Promise) { $result = yield $result; } - + $this->wrap(null === $result ? $value : $result); } finally { $lock->release(); diff --git a/lib/Threading/Semaphore.php b/lib/Threading/Semaphore.php index ac97cb0..bdf75f0 100644 --- a/lib/Threading/Semaphore.php +++ b/lib/Threading/Semaphore.php @@ -2,8 +2,8 @@ namespace Amp\Parallel\Threading; -use Amp\Promise; use Amp\Parallel\Sync\Semaphore as SyncSemaphore; +use Amp\Promise; /** * An asynchronous semaphore based on pthreads' synchronization methods. diff --git a/lib/Threading/Thread.php b/lib/Threading/Thread.php index fa797c8..d0bd962 100644 --- a/lib/Threading/Thread.php +++ b/lib/Threading/Thread.php @@ -2,10 +2,17 @@ namespace Amp\Parallel\Threading; +use Amp\Coroutine; +use Amp\Loop; +use Amp\Parallel\ContextException; +use Amp\Parallel\StatusError; +use Amp\Parallel\Strand; +use Amp\Parallel\Sync\ChannelException; +use Amp\Parallel\Sync\ChannelledSocket; +use Amp\Parallel\Sync\Internal\ExitResult; +use Amp\Parallel\SynchronizationError; +use Amp\Promise; use function Amp\call; -use Amp\{ Coroutine, Loop, Promise }; -use Amp\Parallel\{ ContextException, StatusError, SynchronizationError, Strand }; -use Amp\Parallel\Sync\{ ChannelException, ChannelledSocket, Internal\ExitResult }; /** * Implements an execution context using native multi-threading. diff --git a/lib/Worker/AbstractWorker.php b/lib/Worker/AbstractWorker.php index 8e5669c..eef7805 100644 --- a/lib/Worker/AbstractWorker.php +++ b/lib/Worker/AbstractWorker.php @@ -2,9 +2,14 @@ namespace Amp\Parallel\Worker; -use Amp\{ Coroutine, Deferred, Promise }; -use Amp\Parallel\{ StatusError, Strand } ; -use Amp\Parallel\Worker\Internal\{ Job, TaskResult }; +use Amp\Coroutine; +use Amp\Deferred; +use Amp\Parallel\StatusError; +use Amp\Parallel\Strand; +use Amp\Parallel\Worker\Internal\Job; + +use Amp\Parallel\Worker\Internal\TaskResult; +use Amp\Promise; /** * Base class for most common types of task workers. @@ -15,10 +20,10 @@ abstract class AbstractWorker implements Worker { /** @var bool */ private $shutdown = false; - + /** @var \Amp\Deferred[] */ private $jobQueue = []; - + /** @var callable */ private $when; @@ -27,32 +32,32 @@ abstract class AbstractWorker implements Worker { */ public function __construct(Strand $strand) { $this->context = $strand; - + $this->when = function ($exception, $data) { if ($exception) { $this->kill(); return; } - + if (!$data instanceof TaskResult) { $this->kill(); return; } - + $id = $data->getId(); - + if (!isset($this->jobQueue[$id])) { $this->kill(); return; } - + $deferred = $this->jobQueue[$id]; unset($this->jobQueue[$id]); - + if (!empty($this->jobQueue)) { $this->context->receive()->onResolve($this->when); } - + $deferred->resolve($data->promise()); }; } @@ -85,11 +90,11 @@ abstract class AbstractWorker implements Worker { if (!$this->context->isRunning()) { throw new StatusError('The worker has not been started.'); } - + if ($this->shutdown) { throw new StatusError('The worker has been shut down.'); } - + return new Coroutine($this->doEnqueue($task)); } @@ -108,7 +113,7 @@ abstract class AbstractWorker implements Worker { if (empty($this->jobQueue)) { $this->context->receive()->onResolve($this->when); } - + try { $job = new Job($task); $this->jobQueue[$job->getId()] = $deferred = new Deferred; @@ -117,7 +122,7 @@ abstract class AbstractWorker implements Worker { $this->kill(); throw new WorkerException('Sending the task to the worker failed.', $exception); } - + return yield $deferred->promise(); } @@ -128,7 +133,7 @@ abstract class AbstractWorker implements Worker { if (!$this->context->isRunning() || $this->shutdown) { throw new StatusError('The worker is not running.'); } - + return new Coroutine($this->doShutdown()); } @@ -161,11 +166,11 @@ abstract class AbstractWorker implements Worker { private function cancelPending() { if (!empty($this->jobQueue)) { $exception = new WorkerException('Worker was shut down.'); - + foreach ($this->jobQueue as $job) { $job->fail($exception); } - + $this->jobQueue = []; } } diff --git a/lib/Worker/BasicEnvironment.php b/lib/Worker/BasicEnvironment.php index 60beb46..5d22e20 100644 --- a/lib/Worker/BasicEnvironment.php +++ b/lib/Worker/BasicEnvironment.php @@ -43,7 +43,7 @@ class BasicEnvironment implements Environment { Loop::disable($this->timer); } }); - + Loop::disable($this->timer); Loop::unreference($this->timer); } @@ -70,7 +70,7 @@ class BasicEnvironment implements Environment { return isset($this->data[$key]) ? $this->data[$key] : null; } - + /** * @param string $key * @param mixed $value Using null for the value deletes the key. diff --git a/lib/Worker/DefaultPool.php b/lib/Worker/DefaultPool.php index 422ee42..3a05281 100644 --- a/lib/Worker/DefaultPool.php +++ b/lib/Worker/DefaultPool.php @@ -2,8 +2,10 @@ namespace Amp\Parallel\Worker; -use Amp\{ CallableMaker, Coroutine, Promise }; +use Amp\CallableMaker; +use Amp\Coroutine; use Amp\Parallel\StatusError; +use Amp\Promise; /** * Provides a pool of workers that can be used to execute multiple tasks asynchronously. @@ -14,7 +16,7 @@ use Amp\Parallel\StatusError; */ class DefaultPool implements Pool { use CallableMaker; - + /** @var bool Indicates if the pool is currently running. */ private $running = false; @@ -156,7 +158,7 @@ class DefaultPool implements Pool { public function enqueue(Task $task): Promise { return new Coroutine($this->doEnqueue($this->pull(), $task)); } - + /** * @coroutine * @@ -173,7 +175,7 @@ class DefaultPool implements Pool { } finally { $this->push($worker); } - + return $result; } @@ -190,7 +192,7 @@ class DefaultPool implements Pool { if (!$this->isRunning()) { throw new StatusError('The pool is not running.'); } - + return new Coroutine($this->doShutdown()); } @@ -247,7 +249,7 @@ class DefaultPool implements Pool { public function get(): Worker { return new Internal\PooledWorker($this->pull(), $this->push); } - + /** * Pulls a worker from the pool. The worker should be put back into the pool with push() to be marked as idle. * @@ -282,7 +284,7 @@ class DefaultPool implements Pool { $this->busyQueue->push($worker); $this->workers[$worker] += 1; - + return $worker; } diff --git a/lib/Worker/Internal/Job.php b/lib/Worker/Internal/Job.php index b33cf63..9d71766 100644 --- a/lib/Worker/Internal/Job.php +++ b/lib/Worker/Internal/Job.php @@ -7,19 +7,19 @@ use Amp\Parallel\Worker\Task; class Job { /** @var string */ private $id; - + /** @var \Amp\Parallel\Worker\Task */ private $task; - + public function __construct(Task $task) { $this->task = $task; $this->id = \spl_object_hash($this->task); } - + public function getId(): string { return $this->id; } - + public function getTask(): Task { return $this->task; } diff --git a/lib/Worker/Internal/PooledWorker.php b/lib/Worker/Internal/PooledWorker.php index 674e2c9..775c57b 100644 --- a/lib/Worker/Internal/PooledWorker.php +++ b/lib/Worker/Internal/PooledWorker.php @@ -2,7 +2,8 @@ namespace Amp\Parallel\Worker\Internal; -use Amp\Parallel\Worker\{ Task, Worker }; +use Amp\Parallel\Worker\Task; +use Amp\Parallel\Worker\Worker; use Amp\Promise; class PooledWorker implements Worker { @@ -69,4 +70,4 @@ class PooledWorker implements Worker { public function kill() { $this->worker->kill(); } -} \ No newline at end of file +} diff --git a/lib/Worker/Internal/TaskFailure.php b/lib/Worker/Internal/TaskFailure.php index 3ad381b..6203998 100644 --- a/lib/Worker/Internal/TaskFailure.php +++ b/lib/Worker/Internal/TaskFailure.php @@ -34,7 +34,7 @@ class TaskFailure extends TaskResult { $this->code = $exception->getCode(); $this->trace = $exception->getTraceAsString(); } - + public function promise(): Promise { switch ($this->parent) { case self::PARENT_ERROR: @@ -57,4 +57,4 @@ class TaskFailure extends TaskResult { return new Failure($exception); } -} \ No newline at end of file +} diff --git a/lib/Worker/Internal/TaskResult.php b/lib/Worker/Internal/TaskResult.php index 2e993a8..2083fb3 100644 --- a/lib/Worker/Internal/TaskResult.php +++ b/lib/Worker/Internal/TaskResult.php @@ -7,23 +7,23 @@ use Amp\Promise; abstract class TaskResult { /** @var string Task identifier. */ private $id; - + /** * @param string $id Task identifier. */ public function __construct(string $id) { $this->id = $id; } - + /** * @return string Task identifier. */ public function getId(): string { return $this->id; } - + /** * @return \Amp\Promise Resolved with the task result or failure reason. */ abstract public function promise(): Promise; -} \ No newline at end of file +} diff --git a/lib/Worker/Internal/TaskRunner.php b/lib/Worker/Internal/TaskRunner.php index 9a61da8..4d540bc 100644 --- a/lib/Worker/Internal/TaskRunner.php +++ b/lib/Worker/Internal/TaskRunner.php @@ -2,9 +2,12 @@ namespace Amp\Parallel\Worker\Internal; -use Amp\{ Coroutine, Failure, Success }; -use Amp\Parallel\{ Sync\Channel, Worker\Environment }; +use Amp\Coroutine; +use Amp\Failure; +use Amp\Parallel\Sync\Channel; +use Amp\Parallel\Worker\Environment; use Amp\Promise; +use Amp\Success; class TaskRunner { /** @var \Amp\Parallel\Sync\Channel */ @@ -17,7 +20,7 @@ class TaskRunner { $this->channel = $channel; $this->environment = $environment; } - + /** * Runs the task runner, receiving tasks from the parent and sending the result of those tasks. * @@ -26,7 +29,7 @@ class TaskRunner { public function run(): Promise { return new Coroutine($this->execute()); } - + /** * @coroutine * @@ -37,28 +40,28 @@ class TaskRunner { while ($job instanceof Job) { $task = $job->getTask(); - + try { $result = $task->run($this->environment); - + if ($result instanceof \Generator) { $result = new Coroutine($result); } - + if (!$result instanceof Promise) { $result = new Success($result); } } catch (\Throwable $exception) { $result = new Failure($exception); } - + $result->onResolve(function ($exception, $value) use ($job) { if ($exception) { $result = new TaskFailure($job->getId(), $exception); } else { $result = new TaskSuccess($job->getId(), $value); } - + $this->channel->send($result); }); diff --git a/lib/Worker/Internal/TaskSuccess.php b/lib/Worker/Internal/TaskSuccess.php index a5c6309..383a987 100644 --- a/lib/Worker/Internal/TaskSuccess.php +++ b/lib/Worker/Internal/TaskSuccess.php @@ -2,18 +2,18 @@ namespace Amp\Parallel\Worker\Internal; -use Amp\Success; use Amp\Promise; +use Amp\Success; class TaskSuccess extends TaskResult { /** @var mixed Result of task. */ private $result; - + public function __construct(string $id, $result) { parent::__construct($id); $this->result = $result; } - + public function promise(): Promise { return new Success($this->result); } diff --git a/lib/Worker/TaskError.php b/lib/Worker/TaskError.php index 14d4fdb..c57fdf5 100644 --- a/lib/Worker/TaskError.php +++ b/lib/Worker/TaskError.php @@ -20,7 +20,7 @@ class TaskError extends \Error { $this->name = $name; $this->trace = $trace; } - + /** * Returns the class name of the error thrown from the task. * @@ -29,7 +29,7 @@ class TaskError extends \Error { public function getName(): string { return $this->name; } - + /** * Gets the stack trace at the point the error was thrown in the task. * diff --git a/lib/Worker/TaskException.php b/lib/Worker/TaskException.php index 75fe7d2..d460111 100644 --- a/lib/Worker/TaskException.php +++ b/lib/Worker/TaskException.php @@ -20,7 +20,7 @@ class TaskException extends \Exception { $this->name = $name; $this->trace = $trace; } - + /** * Returns the class name of the exception thrown from the task. * @@ -29,7 +29,7 @@ class TaskException extends \Exception { public function getName(): string { return $this->name; } - + /** * Gets the stack trace at the point the exception was thrown in the task. * diff --git a/lib/Worker/WorkerThread.php b/lib/Worker/WorkerThread.php index 0eec450..a937439 100644 --- a/lib/Worker/WorkerThread.php +++ b/lib/Worker/WorkerThread.php @@ -2,9 +2,9 @@ namespace Amp\Parallel\Worker; -use Amp\Promise; use Amp\Parallel\Threading\Thread; use Amp\Parallel\Worker\Internal\TaskRunner; +use Amp\Promise; /** * A worker thread that executes task objects. diff --git a/lib/Worker/functions.php b/lib/Worker/functions.php index 1340c0f..044739a 100644 --- a/lib/Worker/functions.php +++ b/lib/Worker/functions.php @@ -2,7 +2,8 @@ namespace Amp\Parallel\Worker; -use Amp\{ Loop, Promise }; +use Amp\Loop; +use Amp\Promise; const LOOP_POOL_IDENTIFIER = Pool::class; const LOOP_FACTORY_IDENTIFIER = WorkerFactory::class; diff --git a/test/AbstractContextTest.php b/test/AbstractContextTest.php index 0d9b6b4..2f7f80a 100644 --- a/test/AbstractContextTest.php +++ b/test/AbstractContextTest.php @@ -113,7 +113,6 @@ abstract class AbstractContextTest extends TestCase { $context->start(); yield $context->join(); }); - }, 1000); } diff --git a/test/Forking/ForkTest.php b/test/Forking/ForkTest.php index ded36d8..4e72035 100644 --- a/test/Forking/ForkTest.php +++ b/test/Forking/ForkTest.php @@ -2,9 +2,9 @@ namespace Amp\Parallel\Test\Forking; +use Amp\Loop; use Amp\Parallel\Forking\Fork; use Amp\Parallel\Test\AbstractContextTest; -use Amp\Loop; /** * @group forking diff --git a/test/Sync/AbstractSemaphoreTest.php b/test/Sync/AbstractSemaphoreTest.php index 88c43f2..b5c9d03 100644 --- a/test/Sync/AbstractSemaphoreTest.php +++ b/test/Sync/AbstractSemaphoreTest.php @@ -96,13 +96,13 @@ abstract class AbstractSemaphoreTest extends TestCase { $callback = function () { $awaitable1 = $this->semaphore->acquire(); $awaitable2 = $this->semaphore->acquire(); - + yield new Delayed(500); - + (yield $awaitable1)->release(); - + yield new Delayed(500); - + (yield $awaitable2)->release(); }; diff --git a/test/Sync/ChannelledSocketTest.php b/test/Sync/ChannelledSocketTest.php index cc3606c..a93f00a 100644 --- a/test/Sync/ChannelledSocketTest.php +++ b/test/Sync/ChannelledSocketTest.php @@ -20,7 +20,7 @@ class ChannelledSocketTest extends TestCase { } return $sockets; } - + public function testSendReceive() { Loop::run(function () { list($left, $right) = $this->createSockets(); @@ -54,7 +54,6 @@ class ChannelledSocketTest extends TestCase { $data = yield $b->receive(); $this->assertSame($message, $data); }); - } /** @@ -70,7 +69,6 @@ class ChannelledSocketTest extends TestCase { fwrite($left, pack('L', 10) . '1234567890'); $data = yield $b->receive(); }); - } /** @@ -87,7 +85,6 @@ class ChannelledSocketTest extends TestCase { yield $a->send(function () {}); $data = yield $b->receive(); }); - } /** @@ -102,7 +99,6 @@ class ChannelledSocketTest extends TestCase { yield $a->send('hello'); }); - } /** @@ -117,6 +113,5 @@ class ChannelledSocketTest extends TestCase { $data = yield $a->receive(); }); - } } diff --git a/test/Sync/ChannelledStreamTest.php b/test/Sync/ChannelledStreamTest.php index 4213729..ac0302d 100644 --- a/test/Sync/ChannelledStreamTest.php +++ b/test/Sync/ChannelledStreamTest.php @@ -3,8 +3,8 @@ namespace Amp\Parallel\Test\Sync; use Amp\ByteStream\InputStream; -use Amp\ByteStream\StreamException; use Amp\ByteStream\OutputStream; +use Amp\ByteStream\StreamException; use Amp\Loop; use Amp\Parallel\Sync\ChannelledStream; use Amp\PHPUnit\TestCase; @@ -73,7 +73,6 @@ class ChannelledStreamTest extends TestCase { $data = yield $b->receive(); $this->assertSame($message, $data); }); - } /** @@ -90,7 +89,6 @@ class ChannelledStreamTest extends TestCase { yield $mock->write(pack('L', 10) . '1234567890'); $data = yield $b->receive(); }); - } /** @@ -107,7 +105,6 @@ class ChannelledStreamTest extends TestCase { yield $a->send(function () {}); $data = yield $b->receive(); }); - } /** @@ -129,7 +126,6 @@ class ChannelledStreamTest extends TestCase { yield $a->send('hello'); }); - } /** @@ -147,6 +143,5 @@ class ChannelledStreamTest extends TestCase { $data = yield $a->receive(); }); - } } diff --git a/test/Sync/FileMutexTest.php b/test/Sync/FileMutexTest.php index 067fdbb..5f6f2a3 100644 --- a/test/Sync/FileMutexTest.php +++ b/test/Sync/FileMutexTest.php @@ -14,7 +14,6 @@ class FileMutexTest extends TestCase { $lock->release(); $this->assertTrue($lock->isReleased()); }); - } public function testAcquireMultiple() { diff --git a/test/Sync/PosixSemaphoreTest.php b/test/Sync/PosixSemaphoreTest.php index 339546c..fcf05fd 100644 --- a/test/Sync/PosixSemaphoreTest.php +++ b/test/Sync/PosixSemaphoreTest.php @@ -2,9 +2,10 @@ namespace Amp\Parallel\Test\Sync; -use Amp\Parallel\Forking\Fork; -use Amp\Parallel\Sync\{ PosixSemaphore, Semaphore }; use Amp\Loop; +use Amp\Parallel\Forking\Fork; +use Amp\Parallel\Sync\PosixSemaphore; +use Amp\Parallel\Sync\Semaphore; /** * @group posix @@ -40,7 +41,6 @@ class PosixSemaphoreTest extends AbstractSemaphoreTest { $clone->free(); }); - } public function testFree() { diff --git a/test/Threading/MutexTest.php b/test/Threading/MutexTest.php index 894954a..efdbce7 100644 --- a/test/Threading/MutexTest.php +++ b/test/Threading/MutexTest.php @@ -18,7 +18,6 @@ class MutexTest extends TestCase { $lock->release(); $this->assertTrue($lock->isReleased()); }); - } public function testAcquireMultiple() { diff --git a/test/Threading/ParcelTest.php b/test/Threading/ParcelTest.php index 4b10040..07ca323 100644 --- a/test/Threading/ParcelTest.php +++ b/test/Threading/ParcelTest.php @@ -2,8 +2,8 @@ namespace Amp\Parallel\Test\Threading; -use Amp\Parallel\Threading\Parcel; use Amp\Parallel\Test\Sync\AbstractParcelTest; +use Amp\Parallel\Threading\Parcel; /** * @requires extension pthreads diff --git a/test/Threading/SemaphoreTest.php b/test/Threading/SemaphoreTest.php index 200ce6a..5d49667 100644 --- a/test/Threading/SemaphoreTest.php +++ b/test/Threading/SemaphoreTest.php @@ -4,8 +4,9 @@ namespace Amp\Parallel\Test\Threading; use Amp\Loop; use Amp\Parallel\Sync\Semaphore as SyncSemaphore; -use Amp\Parallel\Threading\{Semaphore, Thread}; use Amp\Parallel\Test\Sync\AbstractSemaphoreTest; +use Amp\Parallel\Threading\Semaphore; +use Amp\Parallel\Threading\Thread; /** * @group threading diff --git a/test/Threading/ThreadTest.php b/test/Threading/ThreadTest.php index b8a5a08..2b9f39b 100644 --- a/test/Threading/ThreadTest.php +++ b/test/Threading/ThreadTest.php @@ -3,8 +3,8 @@ namespace Amp\Parallel\Test\Threading; use Amp\Loop; -use Amp\Parallel\Threading\Thread; use Amp\Parallel\Test\AbstractContextTest; +use Amp\Parallel\Threading\Thread; /** * @group threading @@ -25,6 +25,5 @@ class ThreadTest extends AbstractContextTest { return yield $thread->join(); }); - } } diff --git a/test/Worker/AbstractWorkerTest.php b/test/Worker/AbstractWorkerTest.php index 7aba497..9d61a84 100644 --- a/test/Worker/AbstractWorkerTest.php +++ b/test/Worker/AbstractWorkerTest.php @@ -51,7 +51,7 @@ abstract class AbstractWorkerTest extends TestCase { Loop::run(function () { $worker = $this->createWorker(); $worker->start(); - + $values = yield \Amp\Promise\all([ $worker->enqueue(new TestTask(42)), $worker->enqueue(new TestTask(56)), diff --git a/test/Worker/FunctionsTest.php b/test/Worker/FunctionsTest.php index fd0707e..2369625 100644 --- a/test/Worker/FunctionsTest.php +++ b/test/Worker/FunctionsTest.php @@ -3,10 +3,13 @@ namespace Amp\Parallel\Test\Worker; use Amp\Parallel\Worker; -use Amp\Parallel\Worker\{ Environment, Pool, Task, WorkerFactory }; +use Amp\Parallel\Worker\Environment; +use Amp\Parallel\Worker\Pool; +use Amp\Parallel\Worker\Task; +use Amp\Parallel\Worker\WorkerFactory; use Amp\PHPUnit\TestCase; -use Amp\Success; use Amp\Promise; +use Amp\Success; class FunctionsTest extends TestCase { public function testPool() { diff --git a/test/Worker/ProcessPoolTest.php b/test/Worker/ProcessPoolTest.php index 41c799f..422b46d 100644 --- a/test/Worker/ProcessPoolTest.php +++ b/test/Worker/ProcessPoolTest.php @@ -2,7 +2,9 @@ namespace Amp\Parallel\Test\Worker; -use Amp\Parallel\Worker\{ DefaultPool, WorkerFactory, WorkerProcess }; +use Amp\Parallel\Worker\DefaultPool; +use Amp\Parallel\Worker\WorkerFactory; +use Amp\Parallel\Worker\WorkerProcess; /** * @group process diff --git a/test/Worker/TestTask.php b/test/Worker/TestTask.php index be08460..042e7e1 100644 --- a/test/Worker/TestTask.php +++ b/test/Worker/TestTask.php @@ -2,7 +2,8 @@ namespace Amp\Parallel\Test\Worker; -use Amp\Parallel\Worker\{ Environment, Task }; +use Amp\Parallel\Worker\Environment; +use Amp\Parallel\Worker\Task; class TestTask implements Task { private $returnValue; diff --git a/test/Worker/ThreadPoolTest.php b/test/Worker/ThreadPoolTest.php index 0706a4d..acbabab 100644 --- a/test/Worker/ThreadPoolTest.php +++ b/test/Worker/ThreadPoolTest.php @@ -2,7 +2,9 @@ namespace Amp\Parallel\Test\Worker; -use Amp\Parallel\Worker\{ DefaultPool, WorkerFactory, WorkerThread }; +use Amp\Parallel\Worker\DefaultPool; +use Amp\Parallel\Worker\WorkerFactory; +use Amp\Parallel\Worker\WorkerThread; /** * @group threading