Websocket EventHandler

This commit is contained in:
Alexander Pankratov 2020-01-12 22:25:12 +03:00
parent 6d2efe4c0f
commit eb289abf6e
13 changed files with 548 additions and 137 deletions

View File

@ -74,6 +74,13 @@ Fast, simple, async php telegram api server:
* `http://127.0.0.1:9503/api/session/getSelf` * `http://127.0.0.1:9503/api/session/getSelf`
Each session is store in `{$session}.madeline` file in root folder of library. Each session is store in `{$session}.madeline` file in root folder of library.
* EventHandler updates via websocket. Connect to `ws://127.0.0.1:9503/events`. You will get all events in json.
Each event stored inside object, where key is name of session which created event.
When using CombinedAPI (multiple account) name of session can be added to path of websocket endpoint.
`ws://127.0.0.1:9503/events/session_name`. This endpoint will emmit events only from given session.
PHP websocket client example: [websocket-events.php](https://github.com/xtrime-ru/TelegramApiServer/blob/master/examples/websocket-events.php)
Examples: Examples:
* get_info about channel/user: `http://127.0.0.1:9503/api/getInfo/?id=@xtrime` * get_info about channel/user: `http://127.0.0.1:9503/api/getInfo/?id=@xtrime`

View File

@ -9,6 +9,8 @@
"php": ">=7.4.0", "php": ">=7.4.0",
"ext-json": "*", "ext-json": "*",
"amphp/http-server": "^2", "amphp/http-server": "^2",
"amphp/http-server-router": "^1",
"amphp/websocket-server": "^2",
"vlucas/phpdotenv": "^4", "vlucas/phpdotenv": "^4",
"danog/madelineproto":"^5" "danog/madelineproto":"^5"
}, },

195
composer.lock generated
View File

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically" "This file is @generated automatically"
], ],
"content-hash": "8d2aca34a6b1620d5e86005db622b2dd", "content-hash": "634e5782aa030ceebc6caeed35e42977",
"packages": [ "packages": [
{ {
"name": "amphp/amp", "name": "amphp/amp",
@ -340,16 +340,16 @@
}, },
{ {
"name": "amphp/hpack", "name": "amphp/hpack",
"version": "v3.0.0", "version": "v3.1.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/amphp/hpack.git", "url": "https://github.com/amphp/hpack.git",
"reference": "84fb1373b8a3cfdf7462a87a3e79efe503f0e101" "reference": "0dcd35f9a8d9fc04d5fb8af0aeb109d4474cfad8"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/amphp/hpack/zipball/84fb1373b8a3cfdf7462a87a3e79efe503f0e101", "url": "https://api.github.com/repos/amphp/hpack/zipball/0dcd35f9a8d9fc04d5fb8af0aeb109d4474cfad8",
"reference": "84fb1373b8a3cfdf7462a87a3e79efe503f0e101", "reference": "0dcd35f9a8d9fc04d5fb8af0aeb109d4474cfad8",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@ -394,7 +394,7 @@
"hpack", "hpack",
"http-2" "http-2"
], ],
"time": "2019-12-12T21:37:06+00:00" "time": "2020-01-11T19:33:14+00:00"
}, },
{ {
"name": "amphp/http", "name": "amphp/http",
@ -666,6 +666,69 @@
], ],
"time": "2020-01-04T18:10:10+00:00" "time": "2020-01-04T18:10:10+00:00"
}, },
{
"name": "amphp/http-server-router",
"version": "v1.0.2",
"source": {
"type": "git",
"url": "https://github.com/amphp/http-server-router.git",
"reference": "c6a1731f3833f3a4b4e4cd633889eb14b5ef635b"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/amphp/http-server-router/zipball/c6a1731f3833f3a4b4e4cd633889eb14b5ef635b",
"reference": "c6a1731f3833f3a4b4e4cd633889eb14b5ef635b",
"shasum": ""
},
"require": {
"amphp/http": "^1",
"amphp/http-server": "^2 || ^1 || ^0.8",
"cash/lrucache": "^1",
"nikic/fast-route": "^1"
},
"require-dev": {
"amphp/log": "^1",
"amphp/phpunit-util": "^1",
"friendsofphp/php-cs-fixer": "^2.3",
"league/uri-schemes": "^1.1",
"phpunit/phpunit": "^6"
},
"type": "library",
"autoload": {
"psr-4": {
"Amp\\Http\\Server\\": "src"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Daniel Lowrey",
"email": "rdlowrey@php.net"
},
{
"name": "Bob Weinand"
},
{
"name": "Niklas Keller",
"email": "me@kelunik.com"
},
{
"name": "Aaron Piotrowski",
"email": "aaron@trowski.com"
}
],
"description": "Router responder for Amp's HTTP server.",
"homepage": "https://github.com/amphp/http-server-router",
"keywords": [
"http",
"router",
"server"
],
"time": "2019-08-21T15:51:20+00:00"
},
{ {
"name": "amphp/parallel", "name": "amphp/parallel",
"version": "v1.2.0", "version": "v1.2.0",
@ -1140,6 +1203,80 @@
], ],
"time": "2019-12-22T13:22:00+00:00" "time": "2019-12-22T13:22:00+00:00"
}, },
{
"name": "amphp/websocket-server",
"version": "v2.0.0-rc1",
"source": {
"type": "git",
"url": "https://github.com/amphp/websocket-server.git",
"reference": "8c723e902a56a41eefbf30d7ee1475755743daa2"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/amphp/websocket-server/zipball/8c723e902a56a41eefbf30d7ee1475755743daa2",
"reference": "8c723e902a56a41eefbf30d7ee1475755743daa2",
"shasum": ""
},
"require": {
"amphp/amp": "^2.2",
"amphp/byte-stream": "^1.6.1",
"amphp/http": "^1.3",
"amphp/http-server": "^2",
"amphp/socket": "^1",
"amphp/websocket": "^1",
"php": ">=7.1"
},
"require-dev": {
"amphp/http-client": "^4",
"amphp/http-server-router": "^1.0.2",
"amphp/http-server-static-content": "^1.0.4",
"amphp/log": "^1",
"amphp/php-cs-fixer-config": "dev-master",
"amphp/phpunit-util": "^1.1",
"infection/infection": "^0.9.3",
"league/climate": "^3",
"league/uri-schemes": "^1.1",
"phpunit/phpunit": "^8 || ^7"
},
"suggest": {
"ext-zlib": "Required for compression"
},
"type": "library",
"autoload": {
"psr-4": {
"Amp\\Websocket\\Server\\": "src"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Daniel Lowrey",
"email": "rdlowrey@php.net"
},
{
"name": "Bob Weinand"
},
{
"name": "Niklas Keller",
"email": "me@kelunik.com"
},
{
"name": "Aaron Piotrowski",
"email": "aaron@trowski.com"
}
],
"description": "Websocket server for Amp's HTTP server.",
"homepage": "https://github.com/amphp/websocket-server",
"keywords": [
"http",
"server",
"websocket"
],
"time": "2019-08-21T17:09:20+00:00"
},
{ {
"name": "amphp/windows-registry", "name": "amphp/windows-registry",
"version": "v0.3.2", "version": "v0.3.2",
@ -1965,6 +2102,52 @@
], ],
"time": "2018-11-22T07:55:51+00:00" "time": "2018-11-22T07:55:51+00:00"
}, },
{
"name": "nikic/fast-route",
"version": "v1.3.0",
"source": {
"type": "git",
"url": "https://github.com/nikic/FastRoute.git",
"reference": "181d480e08d9476e61381e04a71b34dc0432e812"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/nikic/FastRoute/zipball/181d480e08d9476e61381e04a71b34dc0432e812",
"reference": "181d480e08d9476e61381e04a71b34dc0432e812",
"shasum": ""
},
"require": {
"php": ">=5.4.0"
},
"require-dev": {
"phpunit/phpunit": "^4.8.35|~5.7"
},
"type": "library",
"autoload": {
"psr-4": {
"FastRoute\\": "src/"
},
"files": [
"src/functions.php"
]
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"BSD-3-Clause"
],
"authors": [
{
"name": "Nikita Popov",
"email": "nikic@php.net"
}
],
"description": "Fast request router for PHP",
"keywords": [
"router",
"routing"
],
"time": "2018-02-13T20:26:39+00:00"
},
{ {
"name": "paragonie/constant_time_encoding", "name": "paragonie/constant_time_encoding",
"version": "v1.0.4", "version": "v1.0.4",

View File

@ -0,0 +1,35 @@
<?php
/**
* Get all updates from MadelineProto EventHandler running inside TelegramApiServer via websocket
* @see \TelegramApiServer\Controllers\EventsController
*/
require 'vendor/autoload.php';
use Amp\Websocket\Client\Connection;
use Amp\Websocket\Message;
use function Amp\Websocket\Client\connect;
$shortopts = 'u::';
$longopts = [
'url::',
];
$options = getopt($shortopts, $longopts);
$options = [
'url' => $options['url'] ?? $options['u'] ?? 'ws://127.0.0.1:9503/events',
];
Amp\Loop::run(function () use($options) {
echo "Connecting to: {$options['url']}" . PHP_EOL;
/** @var Connection $connection */
$connection = yield connect($options['url']);
echo 'Waiting for events...' . PHP_EOL;
while ($message = yield $connection->receive()) {
/** @var Message $message */
$payload = yield $message->buffer();
printf("Received event: %s\n", $payload);
}
});

View File

@ -53,7 +53,7 @@ foreach ($options['session'] as $session) {
if (!$session) { if (!$session) {
$session = 'session'; $session = 'session';
} }
$session = TelegramApiServer\Client::getSessionFileName($session); $session = TelegramApiServer\Client::getSessionFile($session);
$sessionFiles[$session] = ''; $sessionFiles[$session] = '';
} }

View File

@ -2,13 +2,14 @@
namespace TelegramApiServer; namespace TelegramApiServer;
use Amp\Loop;
use danog\MadelineProto; use danog\MadelineProto;
use TelegramApiServer\EventHandlers\EventHandler;
class Client class Client
{ {
/** @var MadelineProto\CombinedAPI */ private static string $sessionExtension = '.madeline';
public MadelineProto\CombinedAPI $MadelineProto; public ?MadelineProto\CombinedAPI $MadelineProtoCombined = null;
private ?string $defaultSession = null;
/** /**
* Client constructor. * Client constructor.
@ -28,9 +29,6 @@ class Client
} }
unset($session); unset($session);
if (count($sessions) === 1) {
$this->defaultSession = (string) array_key_first($sessions);
}
$this->connect($sessions); $this->connect($sessions);
} }
@ -39,9 +37,24 @@ class Client
* *
* @return string|null * @return string|null
*/ */
public static function getSessionFileName(?string $session): ?string public static function getSessionFile(?string $session): ?string
{ {
return $session ? "{$session}.madeline" : null; return $session ? ($session . static::$sessionExtension) : null;
}
public static function getSessionName(?string $sessionFile): ?string
{
if (!$sessionFile) {
return null;
}
$extensionPosition = strrpos($sessionFile, static::$sessionExtension);
if($extensionPosition === false) {
return null;
}
$sessionName = substr_replace($sessionFile, '', $extensionPosition, strlen(static::$sessionExtension));
return $sessionName ?: null;
} }
/** /**
@ -52,17 +65,27 @@ class Client
//При каждой инициализации настройки обновляются из массива $config //При каждой инициализации настройки обновляются из массива $config
echo PHP_EOL . 'Starting MadelineProto...' . PHP_EOL; echo PHP_EOL . 'Starting MadelineProto...' . PHP_EOL;
$time = microtime(true); $time = microtime(true);
$this->MadelineProto = new MadelineProto\CombinedAPI('combined_session.madeline', $sessions);
$this->MadelineProto->async(true); $this->MadelineProtoCombined = new MadelineProto\CombinedAPI('combined_session.madeline', $sessions);
$this->MadelineProto->loop(function() use($sessions) { //В сессии могут быть ссылки на несуществующие классы после обновления кода. Она нам не нужна.
$res = []; $this->MadelineProtoCombined->session = null;
$this->MadelineProtoCombined->async(true);
$this->MadelineProtoCombined->loop(function() use($sessions) {
$promises = [];
foreach ($sessions as $session => $message) { foreach ($sessions as $session => $message) {
MadelineProto\Logger::log("Starting session: {$session}", MadelineProto\Logger::WARNING); MadelineProto\Logger::log("Starting session: {$session}", MadelineProto\Logger::WARNING);
$res[] = $this->MadelineProto->instances[$session]->start(); $promises[]= $this->MadelineProtoCombined->instances[$session]->start();
} }
yield $this->MadelineProto->all($res); yield $this->MadelineProtoCombined::all($promises);
$this->MadelineProtoCombined->setEventHandler(EventHandler::class);
}); });
Loop::defer(function() {
$this->MadelineProtoCombined->loop();
});
$time = round(microtime(true) - $time, 3); $time = round(microtime(true) - $time, 3);
$sessionsCount = count($sessions); $sessionsCount = count($sessions);
MadelineProto\Logger::log( MadelineProto\Logger::log(
@ -78,21 +101,24 @@ class Client
* *
* @return MadelineProto\API * @return MadelineProto\API
*/ */
public function getInstance(?string $session): MadelineProto\API public function getInstance(?string $session = null): MadelineProto\API
{ {
$session = static::getSessionFileName($session) ?: $this->defaultSession; if (count($this->MadelineProtoCombined->instances) === 1) {
$session = (string) array_key_first($this->MadelineProtoCombined->instances);
} else {
$session = static::getSessionFile($session);
}
if (!$session) { if (!$session) {
throw new \InvalidArgumentException('Multiple sessions detected. You need to specify which session to use'); throw new \InvalidArgumentException('Multiple sessions detected. You need to specify which session to use');
} }
if (empty($this->MadelineProto->instances[$session])) { if (empty($this->MadelineProtoCombined->instances[$session])) {
throw new \InvalidArgumentException('Session not found'); throw new \InvalidArgumentException('Session not found');
} }
return $this->MadelineProto->instances[$session]; return $this->MadelineProtoCombined->instances[$session];
} }
} }

View File

@ -8,7 +8,7 @@ use danog\MadelineProto\TL\Conversion\BotAPI;
use function Amp\call; use function Amp\call;
use \danog\MadelineProto; use \danog\MadelineProto;
class CustomMethods class ClientCustomMethods
{ {
use BotAPI; use BotAPI;

View File

@ -6,14 +6,11 @@ namespace TelegramApiServer;
class Config class Config
{ {
/** private static ?Config $instance = null;
* @var self private array $config;
*/
private static $instance;
private $config;
public static function getInstance() public static function getInstance(): Config
{ {
if (null === static::$instance) { if (null === static::$instance) {
static::$instance = new static(); static::$instance = new static();
@ -57,13 +54,13 @@ class Config
private function findByKey($key) private function findByKey($key)
{ {
$key = (string)$key; $key = (string) $key;
$path = explode('.', $key); $path = explode('.', $key);
$value = &$this->config; $value = &$this->config;
foreach ($path as $pathKey) { foreach ($path as $pathKey) {
if (!is_array($value) || !array_key_exists($pathKey, $value)) { if (!is_array($value) || !array_key_exists($pathKey, $value)) {
return; return null;
} }
$value = &$value[$pathKey]; $value = &$value[$pathKey];
} }

View File

@ -1,19 +1,22 @@
<?php <?php
namespace TelegramApiServer; namespace TelegramApiServer\Controllers;
use Amp\ByteStream\ResourceInputStream; use Amp\ByteStream\ResourceInputStream;
use Amp\Http\Server\Request; use Amp\Http\Server\Request;
use Amp\Http\Server\RequestHandler\CallableRequestHandler;
use Amp\Http\Server\Response;
use Amp\Http\Server\Router;
use Amp\Promise; use Amp\Promise;
use TelegramApiServer\Client;
use TelegramApiServer\Config;
use TelegramApiServer\ClientCustomMethods;
class RequestCallback class ApiController
{ {
private Client $client;
private $client; private array $ipWhiteList;
private const PAGES = ['index', 'api']; public array $page = [
/** @var array */
private $ipWhiteList;
public $page = [
'headers' => [ 'headers' => [
'Content-Type'=>'application/json;charset=utf-8', 'Content-Type'=>'application/json;charset=utf-8',
], ],
@ -24,8 +27,23 @@ class RequestCallback
]; ];
private array $parameters = []; private array $parameters = [];
private array $api; private array $api;
private string $session = ''; private ?string $session = '';
public static function getRouterCallback($client): CallableRequestHandler
{
return new CallableRequestHandler(
static function (Request $request) use($client) {
$requestCallback = new static($client);
$response = yield from $requestCallback->process($request);
return new Response(
$requestCallback->page['code'],
$requestCallback->page['headers'],
$response
);
}
);
}
/** /**
* RequestCallback constructor. * RequestCallback constructor.
@ -34,9 +52,8 @@ class RequestCallback
*/ */
public function __construct(Client $client) public function __construct(Client $client)
{ {
$this->ipWhiteList = (array)Config::getInstance()->get('api.ip_whitelist', []); $this->ipWhiteList = (array) Config::getInstance()->get('api.ip_whitelist', []);
$this->client = $client; $this->client = $client;
} }
/** /**
@ -52,7 +69,7 @@ class RequestCallback
} }
yield from $this yield from $this
->resolvePage($request->getUri()->getPath()) ->resolvePath($request->getAttribute(Router::class))
->resolveRequest($request->getUri()->getQuery(), $body, $request->getHeader('Content-Type')) ->resolveRequest($request->getUri()->getQuery(), $body, $request->getHeader('Content-Type'))
->generateResponse($request) ->generateResponse($request)
; ;
@ -60,38 +77,29 @@ class RequestCallback
return $this->getResponse(); return $this->getResponse();
} }
/** /**
* Определяет какую страницу запросили * Получаем параметры из uri
* *
* @param $uri * @param array $path
* @return RequestCallback *
* @return ApiController
*/ */
private function resolvePage($uri): self private function resolvePath(array $path): self
{ {
preg_match("~/(?'page'[^/]*)(?:/(?'session'[^/]*))?/(?'method'[^/]*)~", $uri, $matches); $this->session = $path['session'] ?? null;
$this->api = explode('.', $path['method'] ?? '');
$page = $matches['page'] ?? null;
$this->session = $matches['session'] ?? null;
$this->api = explode('.', $matches['method'] ?? '');
if (!in_array($page, self::PAGES, true)) {
$this->setPageCode(404);
$this->page['errors'][] = 'Incorrect path';
}
if (count($this->api) === 0) {
$this->setPageCode(404);
$this->page['errors'][] = 'No method specified';
}
return $this; return $this;
} }
/** /**
* Получаем параметры из GET и POST
*
* @param string $query * @param string $query
* @param string|null $body * @param string|null $body
* @param string|null $contentType * @param string|null $contentType
* @return RequestCallback *
* @return ApiController
*/ */
private function resolveRequest(string $query, $body, $contentType) private function resolveRequest(string $query, $body, $contentType)
{ {
@ -115,7 +123,8 @@ class RequestCallback
* Получает посты для формирования ответа * Получает посты для формирования ответа
* *
* @param Request $request * @param Request $request
* @return RequestCallback *
* @return ApiController
* @throws \Throwable * @throws \Throwable
*/ */
private function generateResponse(Request $request) private function generateResponse(Request $request)
@ -151,8 +160,8 @@ class RequestCallback
private function callApi() private function callApi()
{ {
$pathSize = count($this->api); $pathSize = count($this->api);
if ($pathSize === 1 && is_callable([CustomMethods::class,$this->api[0]])) { if ($pathSize === 1 && is_callable([ClientCustomMethods::class,$this->api[0]])) {
$customMethods = new CustomMethods($this->client->getInstance($this->session)); $customMethods = new ClientCustomMethods($this->client->getInstance($this->session));
$result = $customMethods->{$this->api[0]}(...$this->parameters); $result = $customMethods->{$this->api[0]}(...$this->parameters);
} else { } else {
//Проверяем нет ли в MadilineProto такого метода. //Проверяем нет ли в MadilineProto такого метода.
@ -176,7 +185,8 @@ class RequestCallback
/** /**
* @param \Throwable $e * @param \Throwable $e
* @return RequestCallback *
* @return ApiController
* @throws \Throwable * @throws \Throwable
*/ */
private function setError(\Throwable $e): self private function setError(\Throwable $e): self
@ -221,16 +231,24 @@ class RequestCallback
$data['success'] = 1; $data['success'] = 1;
} }
$result = json_encode($data, JSON_INVALID_UTF8_SUBSTITUTE|JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $result = json_encode(
$data,
JSON_THROW_ON_ERROR |
JSON_INVALID_UTF8_SUBSTITUTE |
JSON_PRETTY_PRINT |
JSON_UNESCAPED_SLASHES |
JSON_UNESCAPED_UNICODE
);
return $result; return $result . "\n";
} }
/** /**
* Устанавливает http код ответа (200, 400, 404 и тд.) * Устанавливает http код ответа (200, 400, 404 и тд.)
* *
* @param int $code * @param int $code
* @return RequestCallback *
* @return ApiController
*/ */
private function setPageCode(int $code): self private function setPageCode(int $code): self
{ {

View File

@ -0,0 +1,71 @@
<?php
namespace TelegramApiServer\Controllers;
use Amp\Http\Server\Request;
use Amp\Http\Server\Response;
use Amp\Http\Server\Router;
use Amp\Promise;
use Amp\Success;
use Amp\Websocket\Server\Websocket;
use TelegramApiServer\Client;
use TelegramApiServer\EventHandlers\EventHandler;
use function Amp\call;
class EventsController extends Websocket
{
private Client $client;
public static function getRouterCallback(Client $client): EventsController
{
$class = new static();
$class->client = $client;
return $class;
}
public function onHandshake(Request $request, Response $response): Promise
{
try {
$session = $request->getAttribute(Router::class)['session'] ?? null;
if ($session) {
$this->client->getInstance($session);
}
} catch (\Throwable $e){
$response->setStatus(400);
$response->setBody($e->getMessage());
}
return new Success($response);
}
public function onConnect(\Amp\Websocket\Client $client, Request $request, Response $response): Promise
{
return call(function() use($client, $request) {
$requestedSession = $request->getAttribute(Router::class)['session'] ?? null;
$this->subscribeForUpdates($client, $requestedSession);
while ($message = yield $client->receive()) {
// Messages received on the connection are ignored and discarded.
// Messages must be received properly to maintain connection with client (ping-pong check).
}
});
}
private function subscribeForUpdates(\Amp\Websocket\Client $client, ?string $requestedSession): void
{
$clientId = $client->getId();
$client->onClose(static function() use($clientId) {
EventHandler::removeEventListener($clientId);
});
EventHandler::addEventListener($clientId, function($update, string $session) use($clientId, $requestedSession) {
if ($requestedSession && $session !== $requestedSession) {
return;
}
$update = [$session => $update];
$this->multicast(json_encode($update, JSON_INVALID_UTF8_IGNORE | JSON_UNESCAPED_UNICODE), [$clientId]);
});
}
}

View File

@ -0,0 +1,39 @@
<?php
namespace TelegramApiServer\EventHandlers;
use danog\MadelineProto\CombinedEventHandler;
use danog\MadelineProto\Logger;
use TelegramApiServer\Client;
class EventHandler extends CombinedEventHandler
{
/** @var callable[] */
public static array $eventListeners = [];
public static function addEventListener($clientId, callable $callback)
{
Logger::log("Add event listener. ClientId: {$clientId}");
static::$eventListeners[$clientId] = $callback;
}
public static function removeEventListener($clientId): void
{
Logger::log("Removing listener: {$clientId}");
unset(static::$eventListeners[$clientId]);
if (!static::$eventListeners) {
static::$eventListeners = [];
}
}
public function onAny($update, $sessionFile): void
{
$session = Client::getSessionName($sessionFile);
Logger::log("Got update from session: {$session}");
foreach (static::$eventListeners as $clientId => $callback) {
Logger::log("Pass update to callback. ClientId: {$clientId}");
$callback($update, $session);
}
}
}

View File

@ -11,9 +11,15 @@
namespace TelegramApiServer; namespace TelegramApiServer;
use DateTimeInterface;
use Psr\Log\AbstractLogger; use Psr\Log\AbstractLogger;
use Psr\Log\InvalidArgumentException; use Psr\Log\InvalidArgumentException;
use Psr\Log\LogLevel; use Psr\Log\LogLevel;
use danog\MadelineProto;
use function get_class;
use function gettype;
use function is_object;
use const PHP_EOL;
/** /**
* Minimalist PSR-3 logger designed to write in stderr or any other stream. * Minimalist PSR-3 logger designed to write in stderr or any other stream.
@ -22,7 +28,7 @@ use Psr\Log\LogLevel;
*/ */
class Logger extends AbstractLogger class Logger extends AbstractLogger
{ {
private static $levels = [ private static array $levels = [
LogLevel::DEBUG => 0, LogLevel::DEBUG => 0,
LogLevel::INFO => 1, LogLevel::INFO => 1,
LogLevel::NOTICE => 2, LogLevel::NOTICE => 2,
@ -33,8 +39,20 @@ class Logger extends AbstractLogger
LogLevel::EMERGENCY => 7, LogLevel::EMERGENCY => 7,
]; ];
private $minLevelIndex; private static array $madelineLevels = [
private $formatter; LogLevel::DEBUG => MadelineProto\Logger::ULTRA_VERBOSE,
LogLevel::INFO => MadelineProto\Logger::VERBOSE,
LogLevel::NOTICE => MadelineProto\Logger::NOTICE,
LogLevel::WARNING => MadelineProto\Logger::WARNING,
LogLevel::ERROR => MadelineProto\Logger::ERROR,
LogLevel::CRITICAL => MadelineProto\Logger::FATAL_ERROR,
LogLevel::ALERT => MadelineProto\Logger::FATAL_ERROR,
LogLevel::EMERGENCY => MadelineProto\Logger::FATAL_ERROR,
];
private static string $dateTimeFormat = 'Y-m-d H:i:s';
private int $minLevelIndex;
private array $formatter;
public function __construct(string $minLevel = LogLevel::WARNING, callable $formatter = null) public function __construct(string $minLevel = LogLevel::WARNING, callable $formatter = null)
{ {
@ -60,7 +78,7 @@ class Logger extends AbstractLogger
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function log($level, $message, array $context = []) public function log($level, $message, array $context = []): void
{ {
if (!isset(self::$levels[$level])) { if (!isset(self::$levels[$level])) {
throw new InvalidArgumentException(sprintf('The log level "%s" does not exist.', $level)); throw new InvalidArgumentException(sprintf('The log level "%s" does not exist.', $level));
@ -72,8 +90,7 @@ class Logger extends AbstractLogger
$formatter = $this->formatter; $formatter = $this->formatter;
//TODO: Convert LogLevel to MadelineProto loglevels. MadelineProto\Logger::log($formatter($level, $message, $context), static::$madelineLevels[$level]);
\danog\MadelineProto\Logger::log($formatter($level, $message, $context), \danog\MadelineProto\Logger::NOTICE);
} }
private function format(string $level, string $message, array $context): string private function format(string $level, string $message, array $context): string
@ -81,20 +98,20 @@ class Logger extends AbstractLogger
if (false !== strpos($message, '{')) { if (false !== strpos($message, '{')) {
$replacements = []; $replacements = [];
foreach ($context as $key => $val) { foreach ($context as $key => $val) {
if (null === $val || is_scalar($val) || (\is_object($val) && method_exists($val, '__toString'))) { if (null === $val || is_scalar($val) || (is_object($val) && method_exists($val, '__toString'))) {
$replacements["{{$key}}"] = $val; $replacements["{{$key}}"] = $val;
} elseif ($val instanceof \DateTimeInterface) { } elseif ($val instanceof DateTimeInterface) {
$replacements["{{$key}}"] = $val->format(\DateTime::RFC3339); $replacements["{{$key}}"] = $val->format(static::$dateTimeFormat);
} elseif (\is_object($val)) { } elseif (is_object($val)) {
$replacements["{{$key}}"] = '[object '.\get_class($val).']'; $replacements["{{$key}}"] = '[object '. get_class($val).']';
} else { } else {
$replacements["{{$key}}"] = '['.\gettype($val).']'; $replacements["{{$key}}"] = '['. gettype($val).']';
} }
} }
$message = strtr($message, $replacements); $message = strtr($message, $replacements);
} }
return sprintf('%s [%s] %s', date(\DateTime::RFC3339), $level, $message).\PHP_EOL; return sprintf('[%s] [%s] %s', date(static::$dateTimeFormat), $level, $message). PHP_EOL;
} }
} }

View File

@ -4,16 +4,14 @@ namespace TelegramApiServer;
use Amp; use Amp;
use Amp\Http\Server\RequestHandler\CallableRequestHandler; use Amp\Http\Server\RequestHandler\CallableRequestHandler;
use Amp\Promise;
use Amp\Socket;
use Amp\Http\Server\Request; use Amp\Http\Server\Request;
use Amp\Http\Server\Response; use Amp\Http\Server\Response;
use Psr\Log\LogLevel; use Psr\Log\LogLevel;
use TelegramApiServer\Controllers\ApiController;
use TelegramApiServer\Controllers\EventsController;
class Server class Server
{ {
private $config = [];
/** /**
* Server constructor. * Server constructor.
* @param Client $client * @param Client $client
@ -21,31 +19,10 @@ class Server
*/ */
public function __construct(Client $client, array $options) public function __construct(Client $client, array $options)
{ {
$this->setConfig($options); Amp\Loop::run(function () use ($client, $options) {
Amp\Loop::run(function () use ($client) {
$sockets = [
Socket\listen("{$this->config['address']}:{$this->config['port']}"),
];
$server = new Amp\Http\Server\Server( $server = new Amp\Http\Server\Server(
$sockets, $this->getServerAddresses(static::getConfig($options)),
new CallableRequestHandler(function (Request $request) use($client) { static::getRouter($client),
//На каждый запрос должны создаваться новые экземпляры классов парсера и коллбеков,
//иначе их данные будут в области видимости всех запросов.
//Телеграм клиент инициализируется 1 раз и используется во всех запросах.
$requestCallback = new RequestCallback($client);
$response = yield from $requestCallback->process($request);
return new Response(
$requestCallback->page['code'],
$requestCallback->page['headers'],
$response
);
}),
new Logger(LogLevel::DEBUG), new Logger(LogLevel::DEBUG),
(new Amp\Http\Server\Options()) (new Amp\Http\Server\Options())
->withCompression() ->withCompression()
@ -54,43 +31,82 @@ class Server
yield $server->start(); yield $server->start();
// Stop the server gracefully when SIGINT is received. static::registerShutdown($server);
// This is technically optional, but it is best to call Server::stop(). });
}
private static function getServerAddresses(array $config): array
{
return [
Amp\Socket\Server::listen("{$config['address']}:{$config['port']}"),
];
}
private static function getRouter(Client $client): Amp\Http\Server\Router
{
$router = new Amp\Http\Server\Router();
foreach (['GET', 'POST'] as $method) {
$router->addRoute($method, '/api/{session}/{method}', ApiController::getRouterCallback($client));
$router->addRoute($method, '/api/{method}', ApiController::getRouterCallback($client));
$router->addRoute($method, '/events[/{session}]', EventsController::getRouterCallback($client));
}
$router->setFallback(new CallableRequestHandler(static function (Request $request) {
return new Response(
Amp\Http\Status::NOT_FOUND,
[ 'Content-Type'=>'application/json;charset=utf-8'],
json_encode(
[
'success' => 0,
'errors' => [
[
'code' => 404,
'message' => 'Path not found',
]
]
],
JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT
) . "\n"
);
}));
return $router;
}
/**
* Stop the server gracefully when SIGINT is received.
* This is technically optional, but it is best to call Server::stop().
*
* @throws Amp\Loop\UnsupportedFeatureException
*/
private static function registerShutdown(Amp\Http\Server\Server $server)
{
if (defined('SIGINT')) { if (defined('SIGINT')) {
Amp\Loop::onSignal(SIGINT, static function (string $watcherId) use ($server) { Amp\Loop::onSignal(SIGINT, static function (string $watcherId) use ($server) {
Amp\Loop::cancel($watcherId); Amp\Loop::cancel($watcherId);
yield $server->stop(); yield $server->stop();
exit;
}); });
} }
});
} }
/** /**
* Установить конфигурацию для http-сервера * Установить конфигурацию для http-сервера
* *
* @param array $config * @param array $config
* @return Server * @return array
*/ */
private function setConfig(array $config = []): self private static function getConfig(array $config = []): array
{ {
$config = array_filter($config); $config = array_filter($config);
$this->config = array_merge( $config = array_merge(
Config::getInstance()->get("server", []), Config::getInstance()->get('server', []),
$config $config
); );
return $this; return $config;
}
public function resolvePromise(&$promise) {
if ($promise instanceof Promise) {
return yield $promise;
}
return yield;
} }
} }