Start eventHandler for session only if there is listeners on websocket channel.

Fix to do not trigger bot api updates.
This commit is contained in:
Alexander Pankratov 2020-04-13 00:34:51 +03:00
parent ed4716c4a6
commit 0409c0c7b9
10 changed files with 134 additions and 69 deletions

View File

@ -82,7 +82,6 @@ foreach ($options['session'] as $session) {
}
new TelegramApiServer\Server\Server(
new TelegramApiServer\Client(),
$options,
$sessions
);

View File

@ -10,13 +10,22 @@ use InvalidArgumentException;
use Psr\Log\LogLevel;
use RuntimeException;
use TelegramApiServer\EventObservers\EventHandler;
use TelegramApiServer\EventObservers\EventObserver;
use function Amp\call;
class Client
{
public static Client $self;
/** @var MadelineProto\API[] */
public array $instances = [];
public static function getInstance(): Client {
if (empty(static::$self)) {
static::$self = new static();
}
return static::$self;
}
private static function isSessionLoggedIn(MadelineProto\API $instance): bool
{
return ($instance->API->authorized ?? MTProto::NOT_LOGGED_IN) === MTProto::LOGGED_IN;
@ -62,17 +71,14 @@ class Client
throw new InvalidArgumentException('Session not found');
}
$this->instances[$session]->setNoop();
EventObserver::stopEventHandler($session);
$this->instances[$session]->stop();
/** @see runSession() */
//Mark this session as not logged in, so no other actions will be made.
$this->instances[$session]->API->authorized = MTProto::NOT_LOGGED_IN;
unset(
$this->instances[$session],
EventHandler::$instances[$session]
);
unset($this->instances[$session]);
}
/**
@ -80,11 +86,11 @@ class Client
*
* @return MadelineProto\API
*/
public function getInstance(?string $session = null): MadelineProto\API
public function getSession(?string $session = null): MadelineProto\API
{
if (!$this->instances) {
throw new RuntimeException(
'No sessions available. Use combinedApi or restart server with --session option'
'No sessions available. Call /system/addSession?session=%session_name% or restart server with --session option'
);
}
@ -137,7 +143,6 @@ class Client
function() use ($instance) {
if (static::isSessionLoggedIn($instance)) {
yield $instance->start();
yield $instance->setEventHandler(EventHandler::class);
Loop::defer(fn() => $this->loop($instance));
}
}
@ -154,12 +159,7 @@ class Client
$e->getMessage(),
[
'probable_session' => $sessionName,
'exception' => [
'exception' => get_class($e),
'code' => $e->getCode(),
'file' => $e->getFile(),
'line' => $e->getLine(),
],
'exception' => Logger::getExceptionAsArray($e),
]
);
foreach ($this->getBrokenSessions() as $session) {

View File

@ -14,7 +14,7 @@ use Amp\Promise;
use danog\MadelineProto\API;
use danog\MadelineProto\CombinedAPI;
use danog\MadelineProto\TON\API as TonAPI;
use TelegramApiServer\Client;
use TelegramApiServer\Logger;
use TelegramApiServer\MadelineProtoExtensions\ApiExtensions;
use TelegramApiServer\MadelineProtoExtensions\SystemApiExtensions;
@ -22,7 +22,6 @@ abstract class AbstractApiController
{
public const JSON_HEADER = ['Content-Type'=>'application/json;charset=utf-8'];
protected Client $client;
protected Request $request;
protected ?File $file = null;
protected $extensionClass;
@ -41,11 +40,11 @@ abstract class AbstractApiController
abstract protected function resolvePath(array $path);
abstract protected function callApi();
public static function getRouterCallback(Client $client, $extensionClass): CallableRequestHandler
public static function getRouterCallback($extensionClass): CallableRequestHandler
{
return new CallableRequestHandler(
static function (Request $request) use($client, $extensionClass) {
$requestCallback = new static($client, $request, $extensionClass);
static function (Request $request) use($extensionClass) {
$requestCallback = new static($request, $extensionClass);
$response = yield from $requestCallback->process();
if ($response instanceof Response) {
@ -60,9 +59,8 @@ abstract class AbstractApiController
);
}
public function __construct(Client $client, Request $request, $extensionClass = null)
public function __construct(Request $request, $extensionClass = null)
{
$this->client = $client;
$this->request = $request;
$this->extensionClass = $extensionClass;
}
@ -144,12 +142,7 @@ abstract class AbstractApiController
}
} catch (\Throwable $e) {
error($e->getMessage(), [
'exception' => get_class($e),
'code' => $e->getCode(),
'file' => $e->getFile(),
'line' => $e->getLine(),
]);
error($e->getMessage(), Logger::getExceptionAsArray($e));
$this->setError($e);
}
@ -202,13 +195,7 @@ abstract class AbstractApiController
$this->setPageCode(400);
}
$this->page['errors'][] = [
'message' => $e->getMessage(),
'exception' => get_class($e),
'code' => $e->getCode(),
'file' => $e->getFile(),
'line' => $e->getLine(),
];
$this->page['errors'][] = Logger::getExceptionAsArray($e);
return $this;
}

View File

@ -3,6 +3,7 @@
namespace TelegramApiServer\Controllers;
use Amp\Promise;
use TelegramApiServer\Client;
class ApiController extends AbstractApiController
{
@ -27,7 +28,7 @@ class ApiController extends AbstractApiController
*/
protected function callApi()
{
$madelineProto = $this->client->getInstance($this->session);
$madelineProto = Client::getInstance()->getSession($this->session);
return $this->callApiCommon($madelineProto);
}

View File

@ -7,31 +7,31 @@ use Amp\Http\Server\Response;
use Amp\Http\Server\Router;
use Amp\Promise;
use Amp\Success;
use Amp\Websocket\Client as WebsocketClient;
use Amp\Websocket\Server\ClientHandler;
use Amp\Websocket\Server\Websocket;
use Amp\Websocket\Server\Websocket as WebsocketServer;
use TelegramApiServer\Client;
use TelegramApiServer\EventObservers\EventObserver;
use function Amp\call;
class EventsController implements ClientHandler
{
private Client $client;
private ?Websocket $endpoint;
private ?WebsocketServer $endpoint;
public static function getRouterCallback(Client $client): Websocket
public static function getRouterCallback(): WebsocketServer
{
$class = new static();
$class->client = $client;
return new Websocket($class);
return new WebsocketServer($class);
}
public function onStart(Websocket $endpoint): Promise
public function onStart(WebsocketServer $endpoint): Promise
{
$this->endpoint = $endpoint;
return new Success;
}
public function onStop(Websocket $endpoint): Promise
public function onStop(WebsocketServer $endpoint): Promise
{
$this->endpoint = null;
return new Success;
@ -42,7 +42,7 @@ class EventsController implements ClientHandler
try {
$session = $request->getAttribute(Router::class)['session'] ?? null;
if ($session) {
$this->client->getInstance($session);
Client::getInstance()->getSession($session);
}
} catch (\Throwable $e){
$response->setStatus(400);
@ -51,11 +51,11 @@ class EventsController implements ClientHandler
return new Success($response);
}
public function handleClient(\Amp\Websocket\Client $client, Request $request, Response $response): Promise
public function handleClient(WebsocketClient $client, Request $request, Response $response): Promise
{
return call(function() use($client, $request) {
$requestedSession = $request->getAttribute(Router::class)['session'] ?? null;
$this->subscribeForUpdates($client, $requestedSession);
yield from $this->subscribeForUpdates($client, $requestedSession);
while ($message = yield $client->receive()) {
// Messages received on the connection are ignored and discarded.
@ -64,12 +64,15 @@ class EventsController implements ClientHandler
});
}
private function subscribeForUpdates(\Amp\Websocket\Client $client, ?string $requestedSession): void
private function subscribeForUpdates(WebsocketClient $client, ?string $requestedSession): \Generator
{
$clientId = $client->getId();
$client->onClose(static function() use($clientId) {
yield EventObserver::startEventHandler($requestedSession);
$client->onClose(static function() use($clientId, $requestedSession) {
EventObserver::removeSubscriber($clientId);
EventObserver::stopEventHandler($requestedSession);
});
EventObserver::addSubscriber($clientId, function($update, ?string $session) use($clientId, $requestedSession) {

View File

@ -3,6 +3,7 @@
namespace TelegramApiServer\Controllers;
use Amp\Promise;
use TelegramApiServer\Client;
class SystemController extends AbstractApiController
{
@ -23,7 +24,7 @@ class SystemController extends AbstractApiController
*/
protected function callApi()
{
$madelineProtoExtensions = new $this->extensionClass($this->client);
$madelineProtoExtensions = new $this->extensionClass(Client::getInstance());
$result = $madelineProtoExtensions->{$this->api[0]}(...$this->parameters);
return $result;
}

View File

@ -3,10 +3,18 @@
namespace TelegramApiServer\EventObservers;
use Amp\Promise;
use TelegramApiServer\Client;
use TelegramApiServer\Logger;
use function Amp\call;
class EventObserver
{
use ObserverTrait;
/** @var int[] */
private static array $sessionClients = [];
public static function notify(array $update, string $sessionName) {
foreach (static::$subscribers as $clientId => $callback) {
notice("Pass update to callback. ClientId: {$clientId}");
@ -14,4 +22,66 @@ class EventObserver
}
}
private static function addSessionClient(string $session): void
{
if (empty(static::$sessionClients[$session])) {
static::$sessionClients[$session] = 0;
}
++static::$sessionClients[$session];
}
private static function removeSessionClient(string $session): void
{
if (!empty(static::$sessionClients[$session])) {
--static::$sessionClients[$session];
}
}
public static function startEventHandler(?string $requestedSession = null): Promise
{
return call(static function() use($requestedSession) {
$sessions = [];
if ($requestedSession === null) {
$sessions = array_keys(Client::getInstance()->instances);
} else {
$sessions[] = $requestedSession;
}
foreach ($sessions as $session) {
static::addSessionClient($session);
if (static::$sessionClients[$session] === 1) {
try {
warning("Start EventHandler: {$session}");
yield Client::getInstance()->getSession($session)->setEventHandler(EventHandler::class);
} catch (\Throwable $e) {
static::removeSessionClient($session);
error('Cant set EventHandler', [
'session' => $session,
'exception' => Logger::getExceptionAsArray($e)
]);
}
}
}
});
}
public static function stopEventHandler(?string $requestedSession = null): void
{
$sessions = [];
if ($requestedSession === null) {
$sessions = array_keys(Client::getInstance()->instances);
} else {
$sessions[] = $requestedSession;
}
foreach ($sessions as $session) {
static::removeSessionClient($session);
if (empty(static::$sessionClients[$session])) {
warning("Stop EventHandler: {$session}");
Client::getInstance()->getSession($session)->setNoop();
unset(EventHandler::$instances[$session]);
}
}
}
}

View File

@ -154,4 +154,14 @@ class Logger extends AbstractLogger
: ''
) . PHP_EOL;
}
public static function getExceptionAsArray(\Throwable $exception) {
return [
'exception' => get_class($exception),
'message' => $exception->getMessage(),
'file' => $exception->getFile(),
'line' => $exception->getLine(),
'code' => $exception->getCode(),
];
}
}

View File

@ -4,7 +4,6 @@ namespace TelegramApiServer\Server;
use Amp\Http\Server\Request;
use Amp\Http\Server\RequestHandler\CallableRequestHandler;
use TelegramApiServer\Client;
use TelegramApiServer\Controllers\LogController;
use TelegramApiServer\Controllers\SystemController;
use TelegramApiServer\Controllers\ApiController;
@ -18,10 +17,10 @@ class Router
{
private \Amp\Http\Server\Router $router;
public function __construct(Client $client)
public function __construct()
{
$this->router = new \Amp\Http\Server\Router();
$this->setRoutes($client);
$this->setRoutes();
$this->setFallback();
}
@ -37,12 +36,12 @@ class Router
}));
}
private function setRoutes($client): void
private function setRoutes(): void
{
$authorization = new Authorization();
$apiHandler = stack(ApiController::getRouterCallback($client, ApiExtensions::class), $authorization);
$systemApiHandler = stack(SystemController::getRouterCallback($client, SystemApiExtensions::class), $authorization);
$eventsHandler = stack(EventsController::getRouterCallback($client), $authorization);
$apiHandler = stack(ApiController::getRouterCallback(ApiExtensions::class), $authorization);
$systemApiHandler = stack(SystemController::getRouterCallback(SystemApiExtensions::class), $authorization);
$eventsHandler = stack(EventsController::getRouterCallback(), $authorization);
$logHandler = stack(LogController::getRouterCallback(), $authorization);
foreach (['GET', 'POST'] as $method) {

View File

@ -12,16 +12,15 @@ class Server
/**
* Server constructor.
*
* @param Client $client
* @param array $options
* @param array|null $sessionFiles
*/
public function __construct(Client $client, array $options, ?array $sessionFiles)
public function __construct(array $options, ?array $sessionFiles)
{
Amp\Loop::defer(function () use ($client, $options, $sessionFiles) {
Amp\Loop::defer(function () use ($options, $sessionFiles) {
$server = new Amp\Http\Server\Server(
$this->getServerAddresses(static::getConfig($options)),
(new Router($client))->getRouter(),
(new Router())->getRouter(),
Logger::getInstance(),
(new Amp\Http\Server\Options())
->withCompression()
@ -31,7 +30,7 @@ class Server
);
$server->start();
$client->connect($sessionFiles);
Client::getInstance()->connect($sessionFiles);
$this->registerShutdown($server);
});
@ -41,15 +40,11 @@ class Server
Amp\Loop::run();
} catch (\Throwable $e) {
alert($e->getMessage(), [
'exception' => [
'code' => $e->getCode(),
'file' => $e->getFile(),
'line' => $e->getLine(),
],
'exception' => Logger::getExceptionAsArray($e),
]);
foreach ($client->getBrokenSessions() as $session) {
$client->removeSession($session);
foreach (Client::getInstance()->getBrokenSessions() as $session) {
Client::getInstance()->removeSession($session);
}
}
}