From c6239007aa31f3b5e2e66c2ec4597ec0e0215bb1 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 23 Aug 2024 13:13:01 +0200 Subject: [PATCH] Improve cancellation logic, pre-upload broadcast media --- src/Broadcast/Broadcast.php | 36 ++++++++++++++++++++------ src/InternalDoc.php | 21 ++++++++------- src/Loop/Connection/WriteLoop.php | 15 ----------- src/MTProto/MTProtoOutgoingMessage.php | 26 ++++++++++++++++++- 4 files changed, 65 insertions(+), 33 deletions(-) diff --git a/src/Broadcast/Broadcast.php b/src/Broadcast/Broadcast.php index 60353597c..a50045f96 100644 --- a/src/Broadcast/Broadcast.php +++ b/src/Broadcast/Broadcast.php @@ -20,6 +20,7 @@ declare(strict_types=1); namespace danog\MadelineProto\Broadcast; +use Amp\Cancellation; use danog\MadelineProto\Broadcast\Action\ActionForward; use danog\MadelineProto\Broadcast\Action\ActionSend; use Webmozart\Assert\Assert; @@ -50,13 +51,29 @@ trait Broadcast * MadelineProto will also periodically emit updateBroadcastProgress updates, * containing a Progress object for all broadcasts currently in-progress. * - * @param array $messages The messages to send: an array of arrays, containing parameters to pass to messages.sendMessage. - * @param bool $pin Whether to also pin the last sent message. - * @param float|null $delay Number of seconds to wait between each peer. + * @param array $messages The messages to send: an array of arrays, containing parameters to pass to messages.sendMessage. + * @param bool $pin Whether to also pin the last sent message. + * @param float|null $delay Number of seconds to wait between each peer. + * @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable. */ - public function broadcastMessages(array $messages, ?Filter $filter = null, bool $pin = false, ?float $delay = null): int + public function broadcastMessages(array $messages, ?Filter $filter = null, bool $pin = false, ?float $delay = null, ?Cancellation $cancellation = null): int { - return $this->broadcastCustom(new ActionSend($this, $messages, $pin), $filter, $delay); + foreach ($messages as &$message) { + if (isset($message['media']['_']) && + ( + $message['media']['_'] === 'inputMediaUploadedPhoto' + || $message['media']['_'] === 'inputMediaUploadedDocument' + || $message['media']['_'] === 'inputMediaPhotoExternal' + || $message['media']['_'] === 'inputMediaDocumentExternal' + ) + ) { + $message['media'] = $this->methodCallAsyncRead( + 'messages.uploadMedia', + ['peer' => 'me', 'media' => $message['media'], 'cancellation' => $cancellation] + ); + } + } unset($message); + return $this->broadcastCustom(new ActionSend($this, $messages, $pin), $filter, $delay, $cancellation); } /** * Forwards a list of messages to all peers (users, chats, channels) of the bot. @@ -75,10 +92,11 @@ trait Broadcast * @param bool $drop_author If true, will forward messages without quoting the original author. * @param bool $pin Whether to also pin the last sent message. * @param float|null $delay Number of seconds to wait between each peer. + * @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable. */ - public function broadcastForwardMessages(mixed $from_peer, array $message_ids, bool $drop_author = false, ?Filter $filter = null, bool $pin = false, ?float $delay = null): int + public function broadcastForwardMessages(mixed $from_peer, array $message_ids, bool $drop_author = false, ?Filter $filter = null, bool $pin = false, ?float $delay = null, ?Cancellation $cancellation = null): int { - return $this->broadcastCustom(new ActionForward($this, $this->getID($from_peer), $message_ids, $drop_author, $pin), $filter, $delay); + return $this->broadcastCustom(new ActionForward($this, $this->getID($from_peer), $message_ids, $drop_author, $pin), $filter, $delay, $cancellation); } /** @@ -95,14 +113,16 @@ trait Broadcast * * @param Action $action A custom, serializable Action class that will be called once for every peer. * @param float|null $delay Number of seconds to wait between each peer. + * @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable. */ - public function broadcastCustom(Action $action, ?Filter $filter = null, ?float $delay = null): int + public function broadcastCustom(Action $action, ?Filter $filter = null, ?float $delay = null, ?Cancellation $cancellation = null): int { // Ensure it can be serialized Assert::eq(unserialize(serialize($action))::class, $action::class); $id = $this->broadcastId--; $this->broadcasts[$id] = new InternalState($id, $this, $action, $filter ?? Filter::default(), $delay); + $cancellation?->subscribe(fn () => $this->cancelBroadcast($id)); return $id; } /** diff --git a/src/InternalDoc.php b/src/InternalDoc.php index 5fb136e13..9ecb200ab 100644 --- a/src/InternalDoc.php +++ b/src/InternalDoc.php @@ -250,10 +250,11 @@ abstract class InternalDoc * * @param Action $action A custom, serializable Action class that will be called once for every peer. * @param float|null $delay Number of seconds to wait between each peer. + * @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable. */ - final public function broadcastCustom(\danog\MadelineProto\Broadcast\Action $action, ?\danog\MadelineProto\Broadcast\Filter $filter = null, ?float $delay = null): int + final public function broadcastCustom(\danog\MadelineProto\Broadcast\Action $action, ?\danog\MadelineProto\Broadcast\Filter $filter = null, ?float $delay = null, ?\Amp\Cancellation $cancellation = null): int { - return $this->wrapper->getAPI()->broadcastCustom($action, $filter, $delay); + return $this->wrapper->getAPI()->broadcastCustom($action, $filter, $delay, $cancellation); } /** * Forwards a list of messages to all peers (users, chats, channels) of the bot. @@ -272,10 +273,11 @@ abstract class InternalDoc * @param bool $drop_author If true, will forward messages without quoting the original author. * @param bool $pin Whether to also pin the last sent message. * @param float|null $delay Number of seconds to wait between each peer. + * @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable. */ - final public function broadcastForwardMessages(mixed $from_peer, array $message_ids, bool $drop_author = false, ?\danog\MadelineProto\Broadcast\Filter $filter = null, bool $pin = false, ?float $delay = null): int + final public function broadcastForwardMessages(mixed $from_peer, array $message_ids, bool $drop_author = false, ?\danog\MadelineProto\Broadcast\Filter $filter = null, bool $pin = false, ?float $delay = null, ?\Amp\Cancellation $cancellation = null): int { - return $this->wrapper->getAPI()->broadcastForwardMessages($from_peer, $message_ids, $drop_author, $filter, $pin, $delay); + return $this->wrapper->getAPI()->broadcastForwardMessages($from_peer, $message_ids, $drop_author, $filter, $pin, $delay, $cancellation); } /** * Sends a list of messages to all peers (users, chats, channels) of the bot. @@ -291,13 +293,14 @@ abstract class InternalDoc * MadelineProto will also periodically emit updateBroadcastProgress updates, * containing a Progress object for all broadcasts currently in-progress. * - * @param array $messages The messages to send: an array of arrays, containing parameters to pass to messages.sendMessage. - * @param bool $pin Whether to also pin the last sent message. - * @param float|null $delay Number of seconds to wait between each peer. + * @param array $messages The messages to send: an array of arrays, containing parameters to pass to messages.sendMessage. + * @param bool $pin Whether to also pin the last sent message. + * @param float|null $delay Number of seconds to wait between each peer. + * @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable. */ - final public function broadcastMessages(array $messages, ?\danog\MadelineProto\Broadcast\Filter $filter = null, bool $pin = false, ?float $delay = null): int + final public function broadcastMessages(array $messages, ?\danog\MadelineProto\Broadcast\Filter $filter = null, bool $pin = false, ?float $delay = null, ?\Amp\Cancellation $cancellation = null): int { - return $this->wrapper->getAPI()->broadcastMessages($messages, $filter, $pin, $delay); + return $this->wrapper->getAPI()->broadcastMessages($messages, $filter, $pin, $delay, $cancellation); } /** * Fork a new green thread and execute the passed function in the background. diff --git a/src/Loop/Connection/WriteLoop.php b/src/Loop/Connection/WriteLoop.php index ecf07b569..7bdcb69f7 100644 --- a/src/Loop/Connection/WriteLoop.php +++ b/src/Loop/Connection/WriteLoop.php @@ -372,21 +372,6 @@ final class WriteLoop extends Loop $this->connection->new_outgoing[$message_id] = $message; } $message->sent(); - $message->cancellation?->subscribe(function () use ($message): void { - $this->connection->requestResponse?->inc([ - 'method' => $message->constructor, - 'error_message' => 'Request Timeout', - 'error_code' => '408', - ]); - - if ($message->hasMsgId()) { - $this->API->logger("Cancelling $message..."); - $this->API->logger($this->connection->methodCallAsyncRead( - 'rpc_drop_answer', - ['req_msg_id' => $message->getMsgId()] - )); - } - }); } $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); } while ($this->connection->pendingOutgoing && !$skipped); diff --git a/src/MTProto/MTProtoOutgoingMessage.php b/src/MTProto/MTProtoOutgoingMessage.php index 47ca658a9..8617a1731 100644 --- a/src/MTProto/MTProtoOutgoingMessage.php +++ b/src/MTProto/MTProtoOutgoingMessage.php @@ -131,7 +131,31 @@ class MTProtoOutgoingMessage extends MTProtoMessage $this->userRelated = $constructor === 'users.getUsers' && $body === ['id' => [['_' => 'inputUserSelf']]] || $constructor === 'auth.exportAuthorization' || $constructor === 'updates.getDifference'; parent::__construct(!isset(MTProtoMessage::NOT_CONTENT_RELATED[$constructor])); - $cancellation?->subscribe(fn (CancelledException $e) => $this->reply(static fn () => throw $e)); + + $cancellation?->subscribe(function (CancelledException $e): void { + if ($this->hasReply()) { + return; + } + if (!$this->wasSent()) { + $this->reply(static fn () => throw $e); + return; + } + $this->reply(static fn () => throw $e); + + $this->connection->requestResponse?->inc([ + 'method' => $this->constructor, + 'error_message' => 'Request Timeout', + 'error_code' => '408', + ]); + + if ($this->hasMsgId()) { + $this->connection->API->logger("Cancelling $this..."); + $this->connection->API->logger($this->connection->methodCallAsyncRead( + 'rpc_drop_answer', + ['req_msg_id' => $this->getMsgId()] + )); + } + }); } /**