1
0
mirror of https://github.com/danog/psalm.git synced 2025-01-22 05:41:20 +01:00

Use Amp for handling language server event loop

This commit is contained in:
Matthew Brown 2019-01-19 00:58:08 -05:00
parent dfe45e7d26
commit 263a4c8cf1
13 changed files with 313 additions and 58 deletions

View File

@ -19,10 +19,10 @@
"felixfbecker/language-server-protocol": "^1.2",
"felixfbecker/advanced-json-rpc": "^3.0.3",
"netresearch/jsonmapper": "^1.0",
"sabre/event": "^5.0.1",
"webmozart/glob": "^4.1",
"webmozart/path-util": "^2.3",
"symfony/console": "^3.0||^4.0"
"symfony/console": "^3.0||^4.0",
"amphp/amp": "^2.1"
},
"bin": ["psalm", "psalter", "psalm-language-server", "psalm-plugin"],
"autoload": {

View File

@ -16,7 +16,7 @@ use Psalm\Internal\Provider\ParserCacheProvider;
use Psalm\Internal\Provider\Providers;
use Psalm\Internal\Provider\StatementsProvider;
use Psalm\Type;
use Sabre\Event\Loop;
use Amp\Loop;
/**
* @internal
@ -265,7 +265,7 @@ class ProjectAnalyzer
new ProtocolStreamWriter($socket),
$this
);
Loop\run();
\Amp\Loop::run();
} elseif ($socket_server_mode && $address) {
// Run a TCP Server
$tcpServer = stream_socket_server('tcp://' . $address, $errno, $errstr);
@ -304,7 +304,6 @@ class ProjectAnalyzer
new ProtocolStreamWriter($socket),
$this
);
Loop\run();
// Just for safety
exit(0);
}
@ -316,7 +315,7 @@ class ProjectAnalyzer
new ProtocolStreamWriter($socket),
$this
);
Loop\run();
\Amp\Loop::run();
}
}
} else {
@ -327,7 +326,7 @@ class ProjectAnalyzer
new ProtocolStreamWriter(STDOUT),
$this
);
Loop\run();
\Amp\Loop::run();
}
}

View File

@ -5,7 +5,7 @@ namespace Psalm\Internal\LanguageServer\Client;
use Psalm\Internal\LanguageServer\ClientHandler;
use LanguageServerProtocol\{Diagnostic, TextDocumentItem, TextDocumentIdentifier};
use Sabre\Event\Promise;
use Amp\Promise;
use JsonMapper;
/**
@ -53,10 +53,12 @@ class TextDocument
*/
public function xcontent(TextDocumentIdentifier $textDocument): Promise
{
return $this->handler->request(
$promise = $this->handler->request(
'textDocument/xcontent',
['textDocument' => $textDocument]
)->then(
);
$promise->onResolve(
/**
* @param object $result
* @return object
@ -65,5 +67,7 @@ class TextDocument
return $this->mapper->map($result, new TextDocumentItem);
}
);
return $promise;
}
}

View File

@ -4,7 +4,7 @@ declare(strict_types = 1);
namespace Psalm\Internal\LanguageServer;
use AdvancedJsonRpc;
use Sabre\Event\Promise;
use Amp\Promise;
/**
* @internal
@ -38,27 +38,31 @@ class ClientHandler
*
* @param string $method The method to call
* @param array|object $params The method parameters
* @return Promise <mixed> Resolved with the result of the request or rejected with an error
* @return Promise Resolved with the result of the request or rejected with an error
*/
public function request(string $method, $params): Promise
{
$id = $this->idGenerator->generate();
return $this->protocolWriter->write(
$promise = $this->protocolWriter->write(
new Message(
new AdvancedJsonRpc\Request($id, $method, (object)$params)
)
)->then(
);
$promise->onResolve(
/**
* @return Promise
*/
function () use ($id) {
$promise = new Promise;
$deferred = new \Amp\Deferred();
$listener =
/**
* @param callable $listener
* @return void
*/
function (Message $msg) use ($id, $promise, &$listener) {
function (Message $msg) use ($id, $deferred, &$listener) {
/**
* @psalm-suppress UndefinedPropertyFetch
* @psalm-suppress MixedArgument
@ -70,16 +74,18 @@ class ClientHandler
// Received a response
$this->protocolReader->removeListener('message', $listener);
if (AdvancedJsonRpc\SuccessResponse::isSuccessResponse($msg->body)) {
$promise->fulfill($msg->body->result);
$deferred->resolve($msg->body->result);
} else {
$promise->reject($msg->body->error);
$deferred->fail($msg->body->error);
}
}
};
$this->protocolReader->on('message', $listener);
return $promise;
return $deferred->promise();
}
);
return $promise;
}
/**

View File

@ -0,0 +1,71 @@
<?php declare (strict_types=1);
namespace Psalm\Internal\LanguageServer;
/**
* Event Emitter Interface
*
* Anything that accepts listeners and emits events should implement this
* interface.
*
* @copyright Copyright (C) fruux GmbH (https://fruux.com/)
* @author Evert Pot (http://evertpot.com/)
* @license http://sabre.io/license/ Modified BSD License
*/
interface EmitterInterface
{
/**
* Subscribe to an event.
*
* @return void
*/
public function on(string $eventName, callable $callBack, int $priority = 100);
/**
* Emits an event.
*
* This method will return true if 0 or more listeners were succesfully
* handled. false is returned if one of the events broke the event chain.
*
* If the continueCallBack is specified, this callback will be called every
* time before the next event handler is called.
*
* If the continueCallback returns false, event propagation stops. This
* allows you to use the eventEmitter as a means for listeners to implement
* functionality in your application, and break the event loop as soon as
* some condition is fulfilled.
*
* Note that returning false from an event subscriber breaks propagation
* and returns false, but if the continue-callback stops propagation, this
* is still considered a 'successful' operation and returns true.
*
* Lastly, if there are 5 event handlers for an event. The continueCallback
* will be called at most 4 times.
*
* @param array<int, mixed> $arguments
*/
public function emit(
string $eventName,
array $arguments = [],
callable $continueCallBack = null
) : bool;
/**
* Returns the list of listeners for an event.
*
* The list is returned as an array, and the list of events are sorted by
* their priority.
*
* @return callable[]
*/
public function listeners(string $eventName) : array;
/**
* Removes a specific listener from an event.
*
* If the listener could not be found, this method will return false. If it
* was removed it will return true.
*/
public function removeListener(string $eventName, callable $listener) : bool;
}

View File

@ -0,0 +1,153 @@
<?php declare (strict_types=1);
namespace Psalm\Internal\LanguageServer;
/**
* Event Emitter Trait
*
* This trait contains all the basic functions to implement an
* EventEmitterInterface.
*
* Using the trait + interface allows you to add EventEmitter capabilities
* without having to change your base-class.
*
* @copyright Copyright (C) fruux GmbH (https://fruux.com/)
* @author Evert Pot (http://evertpot.com/)
* @license http://sabre.io/license/ Modified BSD License
*/
trait EmitterTrait
{
/**
* The list of listeners
*
* @var array<string, array{0: bool, 1: int[], 2: callable[]}>
*/
protected $listeners = [];
/**
* Subscribe to an event.
*
* @return void
*/
public function on(string $eventName, callable $callBack, int $priority = 100)
{
if (!isset($this->listeners[$eventName])) {
$this->listeners[$eventName] = [
true, // If there's only one item, it's sorted
[$priority],
[$callBack]
];
} else {
$this->listeners[$eventName][0] = false; // marked as unsorted
$this->listeners[$eventName][1][] = $priority;
$this->listeners[$eventName][2][] = $callBack;
}
}
/**
* Emits an event.
*
* This method will return true if 0 or more listeners were succesfully
* handled. false is returned if one of the events broke the event chain.
*
* If the continueCallBack is specified, this callback will be called every
* time before the next event handler is called.
*
* If the continueCallback returns false, event propagation stops. This
* allows you to use the eventEmitter as a means for listeners to implement
* functionality in your application, and break the event loop as soon as
* some condition is fulfilled.
*
* Note that returning false from an event subscriber breaks propagation
* and returns false, but if the continue-callback stops propagation, this
* is still considered a 'successful' operation and returns true.
*
* Lastly, if there are 5 event handlers for an event. The continueCallback
* will be called at most 4 times.
*
* @param array<int, mixed> $arguments
*/
public function emit(
string $eventName,
array $arguments = [],
callable $continueCallBack = null
) : bool {
if (\is_null($continueCallBack)) {
foreach ($this->listeners($eventName) as $listener) {
/** @psalm-suppress MixedAssignment */
$result = \call_user_func_array($listener, $arguments);
if ($result === false) {
return false;
}
}
} else {
$listeners = $this->listeners($eventName);
$counter = \count($listeners);
foreach ($listeners as $listener) {
$counter--;
/** @psalm-suppress MixedAssignment */
$result = \call_user_func_array($listener, $arguments);
if ($result === false) {
return false;
}
if ($counter > 0) {
if (!$continueCallBack()) {
break;
}
}
}
}
return true;
}
/**
* Returns the list of listeners for an event.
*
* The list is returned as an array, and the list of events are sorted by
* their priority.
*
* @return callable[]
*/
public function listeners(string $eventName) : array
{
if (!isset($this->listeners[$eventName])) {
return [];
}
// The list is not sorted
if (!$this->listeners[$eventName][0]) {
// Sorting
/** @psalm-suppress MixedArgument */
\array_multisort($this->listeners[$eventName][1], SORT_NUMERIC, $this->listeners[$eventName][2]);
// Marking the listeners as sorted
$this->listeners[$eventName][0] = true;
}
return $this->listeners[$eventName][2];
}
/**
* Removes a specific listener from an event.
*
* If the listener could not be found, this method will return false. If it
* was removed it will return true.
*/
public function removeListener(string $eventName, callable $listener) : bool
{
if (!isset($this->listeners[$eventName])) {
return false;
}
foreach ($this->listeners[$eventName][2] as $index => $check) {
if ($check === $listener) {
unset($this->listeners[$eventName][1][$index]);
unset($this->listeners[$eventName][2][$index]);
return true;
}
}
return false;
}
}

View File

@ -25,9 +25,8 @@ use Psalm\Internal\LanguageServer\Cache\{FileSystemCache, ClientCache};
use Psalm\Internal\LanguageServer\Server\TextDocument;
use LanguageServerProtocol\{Range, Position, Diagnostic, DiagnosticSeverity};
use AdvancedJsonRpc;
use Sabre\Event\Loop;
use Sabre\Event\Promise;
use function Sabre\Event\coroutine;
use Amp\Promise;
use function Amp\coroutine;
use Throwable;
use Webmozart\PathUtil\Path;
@ -102,7 +101,7 @@ class LanguageServer extends AdvancedJsonRpc\Dispatcher
'message',
/** @return void */
function (Message $msg) {
coroutine(
\Amp\call(
/** @return \Generator<int, Promise, mixed, void> */
function () use ($msg) {
if (!$msg->body) {
@ -113,6 +112,7 @@ class LanguageServer extends AdvancedJsonRpc\Dispatcher
if (AdvancedJsonRpc\Response::isResponse($msg->body)) {
return;
}
$result = null;
$error = null;
try {
@ -150,17 +150,22 @@ class LanguageServer extends AdvancedJsonRpc\Dispatcher
$this->protocolWriter->write(new Message($responseBody));
}
}
)->otherwise('\Psalm\Internal\LanguageServer\LanguageServer::crash');
);
}
);
$this->protocolReader->on(
'readMessageGroup',
/** @return void */
function () {
\Amp\call(
/** @return null */
function () {
$this->doAnalysis();
}
);
}
);
$this->client = new LanguageClient($reader, $writer);
}
@ -181,7 +186,7 @@ class LanguageServer extends AdvancedJsonRpc\Dispatcher
string $rootPath = null,
int $processId = null
): Promise {
return coroutine(
return \Amp\call(
/** @return \Generator<int, true, mixed, InitializeResult> */
function () use ($capabilities, $rootPath, $processId) {
// Eventually, this might block on something. Leave it as a generator.
@ -448,7 +453,7 @@ class LanguageServer extends AdvancedJsonRpc\Dispatcher
*/
public static function crash(Throwable $err)
{
Loop\nextTick(
\Amp\Loop::defer(
/** @return void */
function () use ($err) {
throw $err;

View File

@ -3,8 +3,6 @@ declare(strict_types = 1);
namespace Psalm\Internal\LanguageServer;
use Sabre\Event\EmitterInterface;
/**
* Must emit a "message" event with a Message object as parameter
* when a message comes in

View File

@ -6,19 +6,23 @@ namespace Psalm\Internal\LanguageServer;
use AdvancedJsonRpc\Message as MessageBody;
use Exception;
use Psalm\Internal\LanguageServer\Message;
use Sabre\Event\Emitter;
use Sabre\Event\Loop;
use Amp\Loop;
/**
* Source: https://github.com/felixfbecker/php-language-server/tree/master/src/ProtocolStreamReader.php
*/
class ProtocolStreamReader extends Emitter implements ProtocolReader
class ProtocolStreamReader implements ProtocolReader
{
use EmitterTrait;
const PARSE_HEADERS = 1;
const PARSE_BODY = 2;
/** @var resource */
private $input;
/** @var string */
private $read_watcher;
/**
* This is checked by ProtocolStreamReader so that it will stop reading from streams in the forked process.
* There could be buffered bytes in stdin/over TCP, those would be processed by TCP if it were not for this check.
@ -43,15 +47,7 @@ class ProtocolStreamReader extends Emitter implements ProtocolReader
{
$this->input = $input;
$this->on(
'close',
/** @return void */
function () {
Loop\removeReadStream($this->input);
}
);
Loop\addReadStream(
$read_watcher = Loop::onReadable(
$this->input,
/** @return void */
function () {
@ -72,6 +68,16 @@ class ProtocolStreamReader extends Emitter implements ProtocolReader
}
}
);
$this->read_watcher = $read_watcher;
$this->on(
'close',
/** @return void */
function () {
Loop::cancel($this->read_watcher);
}
);
}
/**

View File

@ -4,7 +4,8 @@ declare(strict_types = 1);
namespace Psalm\Internal\LanguageServer;
use Psalm\Internal\LanguageServer\Message;
use Sabre\Event\{
use Amp\{
Deferred,
Loop,
Promise
};
@ -21,7 +22,12 @@ class ProtocolStreamWriter implements ProtocolWriter
private $output;
/**
* @var array<int, array{message: string, promise: Promise}> $messages
* @var ?string
*/
private $output_watcher;
/**
* @var array<int, array{message: string, deferred: Deferred}> $messages
*/
private $messages = [];
@ -40,7 +46,7 @@ class ProtocolStreamWriter implements ProtocolWriter
{
// if the message queue is currently empty, register a write handler.
if (empty($this->messages)) {
Loop\addWriteStream(
$this->output_watcher = Loop::onWritable(
$this->output,
/** @return void */
function () {
@ -49,12 +55,12 @@ class ProtocolStreamWriter implements ProtocolWriter
);
}
$promise = new Promise();
$deferred = new \Amp\Deferred();
$this->messages[] = [
'message' => (string)$msg,
'promise' => $promise
'deferred' => $deferred
];
return $promise;
return $deferred->promise();
}
/**
@ -67,7 +73,7 @@ class ProtocolStreamWriter implements ProtocolWriter
$keepWriting = true;
while ($keepWriting) {
$message = $this->messages[0]['message'];
$promise = $this->messages[0]['promise'];
$deferred = $this->messages[0]['deferred'];
$bytesWritten = @fwrite($this->output, $message);
@ -80,12 +86,12 @@ class ProtocolStreamWriter implements ProtocolWriter
array_shift($this->messages);
// This was the last message in the queue, remove the write handler.
if (count($this->messages) === 0) {
Loop\removeWriteStream($this->output);
if (count($this->messages) === 0 && $this->output_watcher) {
Loop::cancel($this->output_watcher);
$keepWriting = false;
}
$promise->fulfill();
$deferred->resolve();
} else {
$this->messages[0]['message'] = $message;
$keepWriting = false;

View File

@ -4,7 +4,7 @@ declare(strict_types = 1);
namespace Psalm\Internal\LanguageServer;
use Psalm\Internal\LanguageServer\Message;
use Sabre\Event\Promise;
use Amp\Promise;
interface ProtocolWriter
{

View File

@ -41,8 +41,8 @@ use Psalm\Codebase;
use Psalm\Internal\LanguageServer\Index\ReadableIndex;
use Psalm\Internal\Analyzer\FileAnalyzer;
use Psalm\Internal\Analyzer\ClassLikeAnalyzer;
use Sabre\Event\Promise;
use function Sabre\Event\coroutine;
use Amp\Promise;
use function Amp\coroutine;
use function Psalm\Internal\LanguageServer\{waitForEvent, isVendored};
/**
@ -174,7 +174,7 @@ class TextDocument
*/
public function definition(TextDocumentIdentifier $textDocument, Position $position): Promise
{
return coroutine(
return \Amp\call(
/**
* @return \Generator<int, true, mixed, Hover|Location>
*/
@ -226,7 +226,7 @@ class TextDocument
*/
public function hover(TextDocumentIdentifier $textDocument, Position $position): Promise
{
return coroutine(
return \Amp\call(
/**
* @return \Generator<int, true, mixed, Hover>
*/
@ -278,7 +278,7 @@ class TextDocument
*/
public function completion(TextDocumentIdentifier $textDocument, Position $position): Promise
{
return coroutine(
return \Amp\call(
/**
* @return \Generator<int, true, mixed, array<empty, empty>|CompletionList>
*/

View File

@ -1,6 +1,6 @@
<?php
namespace Sabre\Event;
namespace Amp;
/**
* @template TReturn
@ -9,6 +9,13 @@ namespace Sabre\Event;
*/
function coroutine(callable $gen) : Promise {}
/**
* @template TReturn
* @param callable():(\Generator<mixed, mixed, mixed, TReturn>|null) $gen
* @return Promise<TReturn>
*/
function call(callable $gen) : Promise {}
/**
* @template TReturn
*/