Proper bulking

This commit is contained in:
Daniil Gentili 2024-07-15 10:07:14 +02:00
parent 55775897d6
commit 1101f65388
3 changed files with 28 additions and 56 deletions

View File

@ -14,8 +14,6 @@ use ReflectionProperty;
use Revolt\EventLoop; use Revolt\EventLoop;
use RuntimeException; use RuntimeException;
use TelegramApiServer\EventObservers\EventObserver; use TelegramApiServer\EventObservers\EventObserver;
use function Amp\async;
use function Amp\delay;
final class Client final class Client
{ {
@ -211,10 +209,11 @@ final class Client
$resume = Config::getInstance()->get('error.resume_on_error'); $resume = Config::getInstance()->get('error.resume_on_error');
$currentHandler = EventLoop::getErrorHandler(); $currentHandler = EventLoop::getErrorHandler();
EventLoop::setErrorHandler(static fn(\Throwable $e) => self::errorHandler($e, $currentHandler, $token, $peers, $resume)); EventLoop::setErrorHandler(static fn (\Throwable $e) => self::errorHandler($e, $currentHandler, $token, $peers, $resume));
} }
private static function errorHandler(\Throwable $e, ?callable $currentHandler, string $token, array $peers, bool $resume): void { private static function errorHandler(\Throwable $e, ?callable $currentHandler, string $token, array $peers, bool $resume): void
{
if ($currentHandler) { if ($currentHandler) {
$currentHandler($e); $currentHandler($e);
} }
@ -223,33 +222,32 @@ final class Client
} }
if ($peers && $token) { if ($peers && $token) {
try { try {
$ch = curl_init("https://api.telegram.org/bot$token/sendMessage"); $ch = \curl_init("https://api.telegram.org/bot$token/sendMessage");
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); \curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST');
curl_setopt($ch, CURLOPT_HTTPHEADER, array('Content-Type: application/json')); \curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); \curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT,1); \curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 1);
curl_setopt($ch, CURLOPT_TIMEOUT, 5); \curl_setopt($ch, CURLOPT_TIMEOUT, 5);
foreach ($peers as $peer) { foreach ($peers as $peer) {
$exceptionArray = Logger::getExceptionAsArray($e); $exceptionArray = Logger::getExceptionAsArray($e);
unset($exceptionArray['previous_exception']); unset($exceptionArray['previous_exception']);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode([ \curl_setopt($ch, CURLOPT_POSTFIELDS, \json_encode([
'chat_id' => $peer, 'chat_id' => $peer,
'text' => "```json\n" . 'text' => "```json\n" .
json_encode($exceptionArray, JSON_UNESCAPED_SLASHES|JSON_UNESCAPED_UNICODE|JSON_PRETTY_PRINT) . \json_encode($exceptionArray, JSON_UNESCAPED_SLASHES|JSON_UNESCAPED_UNICODE|JSON_PRETTY_PRINT) .
"\n```" "\n```",
,
'parse_mode' => 'MarkdownV2', 'parse_mode' => 'MarkdownV2',
])); ]));
$response = curl_exec($ch); $response = \curl_exec($ch);
if (curl_getinfo($ch, CURLINFO_HTTP_CODE) !== 200) { if (\curl_getinfo($ch, CURLINFO_HTTP_CODE) !== 200) {
Logger::getInstance()->error('Error notification bot response', [ Logger::getInstance()->error('Error notification bot response', [
'response' => $response, 'response' => $response,
'error_code' => curl_errno($ch), 'error_code' => \curl_errno($ch),
'error' => curl_error($ch), 'error' => \curl_error($ch),
]); ]);
} }

View File

@ -3,14 +3,11 @@
namespace TelegramApiServer\Controllers; namespace TelegramApiServer\Controllers;
use Amp\DeferredFuture; use Amp\DeferredFuture;
use Amp\Future;
use Exception; use Exception;
use Revolt\EventLoop;
use TelegramApiServer\Client; use TelegramApiServer\Client;
use TelegramApiServer\Config; use TelegramApiServer\Config;
use TelegramApiServer\Logger;
use function Amp\async;
use function Amp\delay;
use function Amp\Future\await;
use function Amp\Future\awaitAll;
final class ApiController extends AbstractApiController final class ApiController extends AbstractApiController
{ {
@ -28,6 +25,7 @@ final class ApiController extends AbstractApiController
$this->api = \explode('.', $path['method'] ?? ''); $this->api = \explode('.', $path['method'] ?? '');
} }
private static ?Future $w = null;
/** /**
* @throws Exception * @throws Exception
*/ */
@ -40,40 +38,16 @@ final class ApiController extends AbstractApiController
return $this->callApiCommon($madelineProto); return $this->callApiCommon($madelineProto);
} }
//GROUP REQUESTS IN BULKS if (!self::$w) {
/** @var ?DeferredFuture $lock */ $f = new DeferredFuture;
static $lock = null; self::$w = $f->getFuture();
EventLoop::delay(0.001, static function () use ($f): void {
if (!$lock) { self::$w = null;
try { $f->complete();
$lock = new DeferredFuture(); });
delay($this->waitNextTick());
$lock->complete();
} finally {
$lock = null;
}
} else {
$lock->getFuture()->await();
} }
self::$w->await();
return $this->callApiCommon($madelineProto); return $this->callApiCommon($madelineProto);
} }
/**
* Sync threads execution via time ticks
* Need to enable madelineProto futures bulk execution
* @param float $tick interval of execution in seconds.
*/
protected function waitNextTick(float $tick = 0.5): float {
$tickMs = (int)($tick * 1000);
$now = (int)(microtime(true) * 1000);
$currentTick = intdiv((int)(microtime(true) * 1000), $tickMs);
$nextTick = ($currentTick + 1);
$nextTickTime = $nextTick * $tickMs;
$wait = round(($nextTickTime - $now)/1000, 3);
Logger::getInstance()->notice("Waiting $wait seconds before tick");
return $wait;
}
} }

View File

@ -203,7 +203,7 @@ final class Logger extends AbstractLogger
'file' => $exception->getFile(), 'file' => $exception->getFile(),
'line' => $exception->getLine(), 'line' => $exception->getLine(),
'code' => $exception->getCode(), 'code' => $exception->getCode(),
'backtrace' => array_slice($exception->getTrace(), 0, 3), 'backtrace' => \array_slice($exception->getTrace(), 0, 3),
'previous_exception' => $exception->getPrevious(), 'previous_exception' => $exception->getPrevious(),
]; ];
} }