1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-11-26 17:44:44 +01:00

Improve cancellation logic, pre-upload broadcast media

This commit is contained in:
Daniil Gentili 2024-08-23 13:13:01 +02:00
parent 0eb361cda5
commit c6239007aa
4 changed files with 65 additions and 33 deletions

View File

@ -20,6 +20,7 @@ declare(strict_types=1);
namespace danog\MadelineProto\Broadcast;
use Amp\Cancellation;
use danog\MadelineProto\Broadcast\Action\ActionForward;
use danog\MadelineProto\Broadcast\Action\ActionSend;
use Webmozart\Assert\Assert;
@ -50,13 +51,29 @@ trait Broadcast
* MadelineProto will also periodically emit updateBroadcastProgress updates,
* containing a Progress object for all broadcasts currently in-progress.
*
* @param array $messages The messages to send: an array of arrays, containing parameters to pass to messages.sendMessage.
* @param bool $pin Whether to also pin the last sent message.
* @param float|null $delay Number of seconds to wait between each peer.
* @param array $messages The messages to send: an array of arrays, containing parameters to pass to messages.sendMessage.
* @param bool $pin Whether to also pin the last sent message.
* @param float|null $delay Number of seconds to wait between each peer.
* @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable.
*/
public function broadcastMessages(array $messages, ?Filter $filter = null, bool $pin = false, ?float $delay = null): int
public function broadcastMessages(array $messages, ?Filter $filter = null, bool $pin = false, ?float $delay = null, ?Cancellation $cancellation = null): int
{
return $this->broadcastCustom(new ActionSend($this, $messages, $pin), $filter, $delay);
foreach ($messages as &$message) {
if (isset($message['media']['_']) &&
(
$message['media']['_'] === 'inputMediaUploadedPhoto'
|| $message['media']['_'] === 'inputMediaUploadedDocument'
|| $message['media']['_'] === 'inputMediaPhotoExternal'
|| $message['media']['_'] === 'inputMediaDocumentExternal'
)
) {
$message['media'] = $this->methodCallAsyncRead(
'messages.uploadMedia',
['peer' => 'me', 'media' => $message['media'], 'cancellation' => $cancellation]
);
}
} unset($message);
return $this->broadcastCustom(new ActionSend($this, $messages, $pin), $filter, $delay, $cancellation);
}
/**
* Forwards a list of messages to all peers (users, chats, channels) of the bot.
@ -75,10 +92,11 @@ trait Broadcast
* @param bool $drop_author If true, will forward messages without quoting the original author.
* @param bool $pin Whether to also pin the last sent message.
* @param float|null $delay Number of seconds to wait between each peer.
* @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable.
*/
public function broadcastForwardMessages(mixed $from_peer, array $message_ids, bool $drop_author = false, ?Filter $filter = null, bool $pin = false, ?float $delay = null): int
public function broadcastForwardMessages(mixed $from_peer, array $message_ids, bool $drop_author = false, ?Filter $filter = null, bool $pin = false, ?float $delay = null, ?Cancellation $cancellation = null): int
{
return $this->broadcastCustom(new ActionForward($this, $this->getID($from_peer), $message_ids, $drop_author, $pin), $filter, $delay);
return $this->broadcastCustom(new ActionForward($this, $this->getID($from_peer), $message_ids, $drop_author, $pin), $filter, $delay, $cancellation);
}
/**
@ -95,14 +113,16 @@ trait Broadcast
*
* @param Action $action A custom, serializable Action class that will be called once for every peer.
* @param float|null $delay Number of seconds to wait between each peer.
* @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable.
*/
public function broadcastCustom(Action $action, ?Filter $filter = null, ?float $delay = null): int
public function broadcastCustom(Action $action, ?Filter $filter = null, ?float $delay = null, ?Cancellation $cancellation = null): int
{
// Ensure it can be serialized
Assert::eq(unserialize(serialize($action))::class, $action::class);
$id = $this->broadcastId--;
$this->broadcasts[$id] = new InternalState($id, $this, $action, $filter ?? Filter::default(), $delay);
$cancellation?->subscribe(fn () => $this->cancelBroadcast($id));
return $id;
}
/**

View File

@ -250,10 +250,11 @@ abstract class InternalDoc
*
* @param Action $action A custom, serializable Action class that will be called once for every peer.
* @param float|null $delay Number of seconds to wait between each peer.
* @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable.
*/
final public function broadcastCustom(\danog\MadelineProto\Broadcast\Action $action, ?\danog\MadelineProto\Broadcast\Filter $filter = null, ?float $delay = null): int
final public function broadcastCustom(\danog\MadelineProto\Broadcast\Action $action, ?\danog\MadelineProto\Broadcast\Filter $filter = null, ?float $delay = null, ?\Amp\Cancellation $cancellation = null): int
{
return $this->wrapper->getAPI()->broadcastCustom($action, $filter, $delay);
return $this->wrapper->getAPI()->broadcastCustom($action, $filter, $delay, $cancellation);
}
/**
* Forwards a list of messages to all peers (users, chats, channels) of the bot.
@ -272,10 +273,11 @@ abstract class InternalDoc
* @param bool $drop_author If true, will forward messages without quoting the original author.
* @param bool $pin Whether to also pin the last sent message.
* @param float|null $delay Number of seconds to wait between each peer.
* @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable.
*/
final public function broadcastForwardMessages(mixed $from_peer, array $message_ids, bool $drop_author = false, ?\danog\MadelineProto\Broadcast\Filter $filter = null, bool $pin = false, ?float $delay = null): int
final public function broadcastForwardMessages(mixed $from_peer, array $message_ids, bool $drop_author = false, ?\danog\MadelineProto\Broadcast\Filter $filter = null, bool $pin = false, ?float $delay = null, ?\Amp\Cancellation $cancellation = null): int
{
return $this->wrapper->getAPI()->broadcastForwardMessages($from_peer, $message_ids, $drop_author, $filter, $pin, $delay);
return $this->wrapper->getAPI()->broadcastForwardMessages($from_peer, $message_ids, $drop_author, $filter, $pin, $delay, $cancellation);
}
/**
* Sends a list of messages to all peers (users, chats, channels) of the bot.
@ -291,13 +293,14 @@ abstract class InternalDoc
* MadelineProto will also periodically emit updateBroadcastProgress updates,
* containing a Progress object for all broadcasts currently in-progress.
*
* @param array $messages The messages to send: an array of arrays, containing parameters to pass to messages.sendMessage.
* @param bool $pin Whether to also pin the last sent message.
* @param float|null $delay Number of seconds to wait between each peer.
* @param array $messages The messages to send: an array of arrays, containing parameters to pass to messages.sendMessage.
* @param bool $pin Whether to also pin the last sent message.
* @param float|null $delay Number of seconds to wait between each peer.
* @param ?Cancellation $cancellation Cancellation. Note: you may also use cancelBroadcast with the returned broadcast ID. Be aware that when running via web with limited execution time, the broadcast will continue correctly after a restart and cancelBroadcast will still be usable, but the cancellation that is passed here will not be usable.
*/
final public function broadcastMessages(array $messages, ?\danog\MadelineProto\Broadcast\Filter $filter = null, bool $pin = false, ?float $delay = null): int
final public function broadcastMessages(array $messages, ?\danog\MadelineProto\Broadcast\Filter $filter = null, bool $pin = false, ?float $delay = null, ?\Amp\Cancellation $cancellation = null): int
{
return $this->wrapper->getAPI()->broadcastMessages($messages, $filter, $pin, $delay);
return $this->wrapper->getAPI()->broadcastMessages($messages, $filter, $pin, $delay, $cancellation);
}
/**
* Fork a new green thread and execute the passed function in the background.

View File

@ -372,21 +372,6 @@ final class WriteLoop extends Loop
$this->connection->new_outgoing[$message_id] = $message;
}
$message->sent();
$message->cancellation?->subscribe(function () use ($message): void {
$this->connection->requestResponse?->inc([
'method' => $message->constructor,
'error_message' => 'Request Timeout',
'error_code' => '408',
]);
if ($message->hasMsgId()) {
$this->API->logger("Cancelling $message...");
$this->API->logger($this->connection->methodCallAsyncRead(
'rpc_drop_answer',
['req_msg_id' => $message->getMsgId()]
));
}
});
}
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
} while ($this->connection->pendingOutgoing && !$skipped);

View File

@ -131,7 +131,31 @@ class MTProtoOutgoingMessage extends MTProtoMessage
$this->userRelated = $constructor === 'users.getUsers' && $body === ['id' => [['_' => 'inputUserSelf']]] || $constructor === 'auth.exportAuthorization' || $constructor === 'updates.getDifference';
parent::__construct(!isset(MTProtoMessage::NOT_CONTENT_RELATED[$constructor]));
$cancellation?->subscribe(fn (CancelledException $e) => $this->reply(static fn () => throw $e));
$cancellation?->subscribe(function (CancelledException $e): void {
if ($this->hasReply()) {
return;
}
if (!$this->wasSent()) {
$this->reply(static fn () => throw $e);
return;
}
$this->reply(static fn () => throw $e);
$this->connection->requestResponse?->inc([
'method' => $this->constructor,
'error_message' => 'Request Timeout',
'error_code' => '408',
]);
if ($this->hasMsgId()) {
$this->connection->API->logger("Cancelling $this...");
$this->connection->API->logger($this->connection->methodCallAsyncRead(
'rpc_drop_answer',
['req_msg_id' => $this->getMsgId()]
));
}
});
}
/**