Logs via websockets

This commit is contained in:
Alexander Pankratov 2020-02-10 04:50:17 +03:00
parent 88a5eafcd0
commit 0902e1279c
11 changed files with 143 additions and 68 deletions

View File

@ -193,16 +193,35 @@ After authorization eventHandler need to be set, to receive updates for new sess
* `http://127.0.0.1:9503/api/users/xtrime/setEventHandler`
* `http://127.0.0.1:9503/api/bot/setEventHandler`
### EventHandler updates via websocket.
### Websocket
#### EventHandler updates (webhooks).
Connect to `ws://127.0.0.1:9503/events`. You will get all events in json.
Connect to `ws://127.0.0.1:9503/events` to get all events in json.
This is efficient alternative for webhooks.
Each event is json object in [json-rpc 2.0 format](https://www.jsonrpc.org/specification#response_object). Example:
When using CombinedAPI (multiple accounts) name of session can be added to path of websocket endpoint:
When using multiple sessions, name of session can be added to path of websocket endpoint:
This endpoint will send events only from `users/xtrime` session: `ws://127.0.0.1:9503/events/users/xtrime`
PHP websocket client example: [websocket-events.php](https://github.com/xtrime-ru/TelegramApiServer/blob/master/examples/websocket-events.php)
`php examples/websocket-events.php --url=ws://127.0.0.1:9503/events`
#### Logs.
Connect to `ws://127.0.0.1:9503/log[/%level%]` to get logs in real time.
`%level%` is optional parameter to filter logs.
If filter is specified, then only messages with equal or greater level will be send.
This endpoint will send only alert and emergency logs: `ws://127.0.0.1:9503/log/alert`
Available levels: debug, info, notice, warning, error, critical, alert, emergency.
PHP websocket client example: [websocket-events.php](https://github.com/xtrime-ru/TelegramApiServer/blob/master/examples/websocket-events.php)
`php examples/websocket-events.php --url=ws://127.0.0.1:9503/log`
## Contacts
* Telegram: [@xtrime](tg://resolve?domain=xtrime)

View File

@ -11,7 +11,8 @@ $settings = [
'api_hash' => getenv('TELEGRAM_API_HASH'),
],
'logger' => [ // Logger settings
'logger' => \danog\MadelineProto\Logger::ECHO_LOGGER, // 0 - Logs disabled, 3 - echo logs.
'logger' => \danog\MadelineProto\Logger::CALLABLE_LOGGER, // 0 - Logs disabled, 3 - echo logs.
'logger_param' => \TelegramApiServer\EventObservers\LogObserver::class . '::log',
'logger_level' => getenv('LOGGER_LEVEL'), // Logging level, available logging levels are: ULTRA_VERBOSE - 5, VERBOSE - 4 , NOTICE - 3, WARNING - 2, ERROR - 1, FATAL_ERROR - 0.
],
'updates' => [

View File

@ -65,7 +65,7 @@ class Client
public function connect($sessionFiles): void
{
Logger::getInstance()->warning(PHP_EOL . 'Starting MadelineProto...' . PHP_EOL);
Logger::warning(PHP_EOL . 'Starting MadelineProto...' . PHP_EOL);
foreach ($sessionFiles as $file) {
$sessionName = static::getSessionName($file);
@ -76,7 +76,7 @@ class Client
$this->startSessions();
$sessionsCount = count($sessionFiles);
Logger::getInstance()->warning(
Logger::warning(
"\nTelegramApiServer ready."
. "\nNumber of sessions: {$sessionsCount}."
);
@ -178,7 +178,7 @@ class Client
try {
$callback ? $instance->loop($callback) : $instance->loop();
} catch (\Throwable $e) {
Logger::getInstance()->critical(
Logger::critical(
$e->getMessage(),
[
'session' => $sessionName,

View File

@ -9,7 +9,7 @@ use Amp\Promise;
use Amp\Success;
use Amp\Websocket\Server\Websocket;
use TelegramApiServer\Client;
use TelegramApiServer\EventObservers\EventHandler;
use TelegramApiServer\EventObservers\EventObserver;
use function Amp\call;
class EventsController extends Websocket
@ -41,7 +41,6 @@ class EventsController extends Websocket
{
return call(function() use($client, $request) {
$requestedSession = $request->getAttribute(Router::class)['session'] ?? null;
$this->subscribeForUpdates($client, $requestedSession);
while ($message = yield $client->receive()) {
@ -56,10 +55,10 @@ class EventsController extends Websocket
$clientId = $client->getId();
$client->onClose(static function() use($clientId) {
EventHandler::removeEventListener($clientId);
EventObserver::removeSubscriber($clientId);
});
EventHandler::addEventListener($clientId, function($update, ?string $session) use($clientId, $requestedSession) {
EventObserver::addSubscriber($clientId, function($update, ?string $session) use($clientId, $requestedSession) {
if ($requestedSession && $session !== $requestedSession) {
return;
}

View File

@ -8,31 +8,32 @@ use Amp\Http\Server\Router;
use Amp\Promise;
use Amp\Success;
use Amp\Websocket\Server\Websocket;
use TelegramApiServer\Client;
use Psr\Log\LogLevel;
use TelegramApiServer\EventObservers\LogObserver;
use TelegramApiServer\Logger;
use function Amp\call;
class LogsController extends Websocket
class LogController extends Websocket
{
private Client $client;
public static function getRouterCallback(Client $client): EventsController
public static function getRouterCallback(): LogController
{
$class = new static();
$class->client = $client;
return $class;
return new static();
}
public function onHandshake(Request $request, Response $response): Promise
{
$level = $request->getAttribute(Router::class)['level'] ?? LogLevel::DEBUG;
if (!isset(Logger::$levels[$level])) {
$response->setStatus(400);
}
return new Success($response);
}
public function onConnect(\Amp\Websocket\Client $client, Request $request, Response $response): Promise
{
return call(function() use($client, $request) {
$requestedSession = $request->getAttribute(Router::class)['session'] ?? null;
$this->subscribeForUpdates($client, $requestedSession);
$level = $request->getAttribute(Router::class)['level'] ?? LogLevel::DEBUG;
$this->subscribeForUpdates($client, $level);
while ($message = yield $client->receive()) {
// Messages received on the connection are ignored and discarded.
@ -41,23 +42,24 @@ class LogsController extends Websocket
});
}
private function subscribeForUpdates(\Amp\Websocket\Client $client, ?string $requestedSession): void
private function subscribeForUpdates(\Amp\Websocket\Client $client, string $requestedLevel): void
{
$clientId = $client->getId();
$client->onClose(static function() use($clientId) {
\TelegramApiServer\EventObservers\EventHandler::removeEventListener($clientId);
LogObserver::removeSubscriber($clientId);
});
\TelegramApiServer\EventObservers\EventHandler::addEventListener($clientId, function($update, string $session) use($clientId, $requestedSession) {
if ($requestedSession && $session !== $requestedSession) {
LogObserver::addSubscriber($clientId, function(string $level, string $message, array $context = []) use($clientId, $requestedLevel) {
if ($requestedLevel && Logger::$levels[$level] < Logger::$levels[$requestedLevel]) {
return;
}
$update = [
'jsonrpc' => '2.0',
'result' => [
'session' => $session,
'update' => $update,
'level' => $level,
'message' => $message,
'context' => $context
],
'id' => null,
];

View File

@ -8,10 +8,8 @@ use TelegramApiServer\Logger;
class EventHandler extends \danog\MadelineProto\EventHandler
{
/** @var callable[] */
public static array $eventListeners = [];
private string $sessionName;
public static array $instances = [];
private string $sessionName;
public function __construct(API $MadelineProto)
{
@ -19,40 +17,19 @@ class EventHandler extends \danog\MadelineProto\EventHandler
if (empty(static::$instances[$this->sessionName])) {
static::$instances[$this->sessionName] = true;
parent::__construct($MadelineProto);
Logger::getInstance()->warning("Event observer CONSTRUCTED: {$this->sessionName}");
Logger::warning("Event observer CONSTRUCTED: {$this->sessionName}");
}
}
public function __destruct()
{
unset(static::$instances[$this->sessionName]);
Logger::getInstance()->warning("Event observer DESTRUCTED: {$this->sessionName}");
}
public static function addEventListener($clientId, callable $callback): void
{
Logger::getInstance()->notice("Add event listener. ClientId: {$clientId}");
static::$eventListeners[$clientId] = $callback;
}
public static function removeEventListener($clientId): void
{
Logger::getInstance()->notice("Removing listener: {$clientId}");
unset(static::$eventListeners[$clientId]);
$listenersCount = count(static::$eventListeners);
Logger::getInstance()->notice("Event listeners left: {$listenersCount}");
if ($listenersCount === 0) {
static::$eventListeners = [];
}
Logger::warning("Event observer DESTRUCTED: {$this->sessionName}");
}
public function onAny($update): void
{
Logger::getInstance()->info("Received update from session: {$this->sessionName}");
foreach (static::$eventListeners as $clientId => $callback) {
Logger::getInstance()->notice("Pass update to callback. ClientId: {$clientId}");
$callback($update, $this->sessionName);
}
Logger::info("Received update from session: {$this->sessionName}");
EventObserver::notify($update, $this->sessionName);
}
}

View File

@ -0,0 +1,18 @@
<?php
namespace TelegramApiServer\EventObservers;
use TelegramApiServer\Logger;
class EventObserver
{
use ObserverTrait;
public static function notify(array $update, string $sessionName) {
foreach (static::$subscribers as $clientId => $callback) {
Logger::notice("Pass update to callback. ClientId: {$clientId}");
$callback($update, $sessionName);
}
}
}

View File

@ -0,0 +1,21 @@
<?php
namespace TelegramApiServer\EventObservers;
use TelegramApiServer\Logger;
class LogObserver
{
use ObserverTrait;
public static function notify(string $level, string $message, array $context = []): void
{
foreach (static::$subscribers as $clientId => $callback) {
$callback($level, $message, $context);
}
}
public static function log(string $message, int $level) {
Logger::log(Logger::$madelineLevels[$level], $message);
}
}

View File

@ -0,0 +1,28 @@
<?php
namespace TelegramApiServer\EventObservers;
use TelegramApiServer\Logger;
trait ObserverTrait
{
/** @var callable[] */
public static array $subscribers = [];
public static function addSubscriber($clientId, callable $callback): void
{
Logger::notice("Add event listener. ClientId: {$clientId}");
static::$subscribers[$clientId] = $callback;
}
public static function removeSubscriber($clientId): void
{
Logger::notice("Removing listener: {$clientId}");
unset(static::$subscribers[$clientId]);
$listenersCount = count(static::$subscribers);
Logger::notice("Event listeners left: {$listenersCount}");
if ($listenersCount === 0) {
static::$subscribers = [];
}
}
}

View File

@ -16,6 +16,7 @@ use DateTimeInterface;
use Psr\Log\AbstractLogger;
use Psr\Log\InvalidArgumentException;
use Psr\Log\LogLevel;
use TelegramApiServer\EventObservers\LogObserver;
use function get_class;
use function gettype;
use function is_object;
@ -30,7 +31,7 @@ class Logger extends AbstractLogger
{
private static ?Logger $instanse = null;
private static array $levels = [
public static array $levels = [
LogLevel::DEBUG => 0,
LogLevel::INFO => 1,
LogLevel::NOTICE => 2,
@ -41,15 +42,13 @@ class Logger extends AbstractLogger
LogLevel::EMERGENCY => 7,
];
private static array $madelineLevels = [
LogLevel::DEBUG => MadelineProto\Logger::ULTRA_VERBOSE,
LogLevel::INFO => MadelineProto\Logger::VERBOSE,
LogLevel::NOTICE => MadelineProto\Logger::NOTICE,
LogLevel::WARNING => MadelineProto\Logger::WARNING,
LogLevel::ERROR => MadelineProto\Logger::ERROR,
LogLevel::CRITICAL => MadelineProto\Logger::FATAL_ERROR,
LogLevel::ALERT => MadelineProto\Logger::FATAL_ERROR,
LogLevel::EMERGENCY => MadelineProto\Logger::FATAL_ERROR,
public static array $madelineLevels = [
MadelineProto\Logger::ULTRA_VERBOSE => LogLevel::DEBUG,
MadelineProto\Logger::VERBOSE => LogLevel::INFO,
MadelineProto\Logger::NOTICE => LogLevel::NOTICE,
MadelineProto\Logger::WARNING => LogLevel::WARNING,
MadelineProto\Logger::ERROR => LogLevel::ERROR,
MadelineProto\Logger::FATAL_ERROR => LogLevel::CRITICAL,
];
private static string $dateTimeFormat = 'Y-m-d H:i:s';
@ -91,14 +90,18 @@ class Logger extends AbstractLogger
if (!static::$instanse) {
$settings = Config::getInstance()->get('telegram');
$conversionTable = array_flip(static::$madelineLevels);
$loggerLevel = $conversionTable[$settings['logger']['logger_level']];
$loggerLevel = static::$madelineLevels[$settings['logger']['logger_level']];
static::$instanse = new static($loggerLevel);
}
return static::$instanse;
}
public static function __callStatic($name, $arguments)
{
static::getInstance()->{$name}(...$arguments);
}
/**
* {@inheritdoc}
*/
@ -108,6 +111,8 @@ class Logger extends AbstractLogger
throw new InvalidArgumentException(sprintf('The log level "%s" does not exist.', $level));
}
LogObserver::notify($level, $message, $context);
if (self::$levels[$level] < $this->minLevelIndex) {
return;
}

View File

@ -5,6 +5,7 @@ namespace TelegramApiServer\Server;
use Amp\Http\Server\Request;
use Amp\Http\Server\RequestHandler\CallableRequestHandler;
use TelegramApiServer\Client;
use TelegramApiServer\Controllers\LogController;
use TelegramApiServer\Controllers\SystemController;
use TelegramApiServer\Controllers\ApiController;
use TelegramApiServer\Controllers\EventsController;
@ -42,6 +43,7 @@ class Router
$apiHandler = stack(ApiController::getRouterCallback($client, ApiExtensions::class), $authorization);
$systemApiHandler = stack(SystemController::getRouterCallback($client, SystemApiExtensions::class), $authorization);
$eventsHandler = stack(EventsController::getRouterCallback($client), $authorization);
$logHandler = stack(LogController::getRouterCallback(), $authorization);
foreach (['GET', 'POST'] as $method) {
$this->router->addRoute($method, '/api/{method}[/]', $apiHandler);
@ -52,6 +54,9 @@ class Router
$this->router->addRoute('GET', '/events[/]', $eventsHandler);
$this->router->addRoute('GET', '/events/{session:.*?[^/]}[/]', $eventsHandler);
$this->router->addRoute('GET', '/log[/]', $logHandler);
$this->router->addRoute('GET', '/log/{level:.*?[^/]}[/]', $logHandler);
}