1
0
mirror of https://github.com/danog/MadelineProto.git synced 2025-01-22 16:51:16 +01:00

Allow playing streams via IPC

This commit is contained in:
Daniil Gentili 2023-08-20 14:17:21 +02:00
parent 8b1a8f953a
commit 1458a2b42b
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
10 changed files with 86 additions and 43 deletions

View File

@ -579,6 +579,7 @@ Want to add your own open-source project to this list? [Click here!](https://doc
* <a href="https://docs.madelineproto.xyz/API_docs/methods/stats.getMegagroupStats.html" name="stats.getMegagroupStats">Get supergroup statistics: stats.getMegagroupStats</a>
* <a href="https://docs.madelineproto.xyz/API_docs/methods/account.getTmpPassword.html" name="account.getTmpPassword">Get temporary payment password: account.getTmpPassword</a>
* <a href="https://docs.madelineproto.xyz/API_docs/methods/channels.getAdminLog.html" name="channels.getAdminLog">Get the admin log of a channel/supergroup: channels.getAdminLog</a>
* <a href="https://docs.madelineproto.xyz/PHP/danog/MadelineProto/API.html#callgetcurrent-int-id-danog-madelineproto-remoteurl-danog-madelineproto-localfile-string-null" name="callGetCurrent">Get the file that is currently being played: callGetCurrent</a>
* <a href="https://docs.madelineproto.xyz/API_docs/methods/messages.getSearchCounters.html" name="messages.getSearchCounters">Get the number of results that would be found by a messages.search call with the same parameters: messages.getSearchCounters</a>
* <a href="https://docs.madelineproto.xyz/API_docs/methods/channels.getParticipants.html" name="channels.getParticipants">Get the participants of a supergroup/channel: channels.getParticipants</a>
* <a href="https://docs.madelineproto.xyz/PHP/danog/MadelineProto/API.html#getcallbypeer-int-userid-danog-madelineproto-voip" name="getCallByPeer">Get the phone call with the specified user ID: getCallByPeer</a>
@ -835,7 +836,6 @@ Want to add your own open-source project to this list? [Click here!](https://doc
* <a href="https://docs.madelineproto.xyz/API_docs/methods/channels.toggleJoinToSend.html" name="channels.toggleJoinToSend">Set whether all users should join a discussion group in order to comment on a post »: channels.toggleJoinToSend</a>
* <a href="https://docs.madelineproto.xyz/API_docs/methods/channels.toggleJoinRequest.html" name="channels.toggleJoinRequest">Set whether all users should request admin approval to join the group »: channels.toggleJoinRequest</a>
* <a href="https://docs.madelineproto.xyz/API_docs/methods/bots.setBotMenuButton.html" name="bots.setBotMenuButton">Sets the menu button action » for a given user or for all users: bots.setBotMenuButton</a>
* <a href="https://docs.madelineproto.xyz/PHP/danog/MadelineProto/API.html#setuplogger-void" name="setupLogger">Setup logger: setupLogger</a>
* <a href="https://docs.madelineproto.xyz/API_docs/methods/messages.hidePeerSettingsBar.html" name="messages.hidePeerSettingsBar">Should be called after the user hides the report spam/add as contact bar of a new chat, effectively prevents the user from executing the actions specified in the peer's settings: messages.hidePeerSettingsBar</a>
* <a href="https://docs.madelineproto.xyz/API_docs/methods/messages.togglePeerTranslations.html" name="messages.togglePeerTranslations">Show or hide the real-time chat translation popup for a certain chat: messages.togglePeerTranslations</a>
* <a href="https://docs.madelineproto.xyz/API_docs/methods/messages.startBot.html" name="messages.startBot">Start a conversation with a bot using a deep linking parameter: messages.startBot</a>

View File

@ -283,6 +283,8 @@ abstract class InternalDoc
}
/**
* Get the file that is currently being played.
*
* Will return a string with the object ID of the stream if we're currently playing a stream, otherwise returns the related LocalFile or RemoteUrl.
*/
public function callGetCurrent(int $id): \danog\MadelineProto\RemoteUrl|\danog\MadelineProto\LocalFile|string|null
{

View File

@ -23,7 +23,6 @@ namespace danog\MadelineProto\Ipc;
use Amp\ByteStream\ReadableStream;
use Amp\Cancellation;
use Amp\DeferredCancellation;
use Amp\DeferredFuture;
use Amp\Ipc\Sync\ChannelledSocket;
use danog\MadelineProto\Exception;
use danog\MadelineProto\FileCallbackInterface;
@ -38,7 +37,6 @@ use Revolt\EventLoop;
use Throwable;
use function Amp\async;
use function Amp\Future\await;
/**
* IPC client.
@ -163,21 +161,10 @@ final class Client extends ClientAbstract
*/
public function callPlay(int $id, LocalFile|RemoteUrl|ReadableStream $file): void
{
$future = null;
$params = [$id, &$file];
if ($file instanceof ReadableStream) {
$deferred = new DeferredFuture;
$file->onClose(function () use ($deferred) {
$deferred->complete();
});
$future = $deferred->getFuture();
}
$wrapper = Wrapper::create($params, $this->session, $this->logger);
$wrapper->wrap($file, true);
$this->__call('callPlay', $wrapper);
if ($future) {
$future->await();
}
$this->__call('callPlayBlocking', $wrapper);
}
/**
@ -186,29 +173,15 @@ final class Client extends ClientAbstract
public function callPlayOnHold(int $id, LocalFile|RemoteUrl|ReadableStream ...$files): void
{
$params = [$id, $files];
$futures = [];
foreach ($files as $file) {
if ($file instanceof ReadableStream) {
$deferred = new DeferredFuture;
$file->onClose(function () use ($deferred) {
$deferred->complete();
});
$futures []= $deferred->getFuture();
}
}
$wrapper = Wrapper::create($params, $this->session, $this->logger);
foreach ($params as &$param) {
if ($param instanceof ReadableStream) {
$wrapper->wrap($param, true);
}
}
$this->__call('callPlayOnHold', $wrapper);
if ($futures) {
await($futures);
}
$this->__call('callPlayOnHoldBlocking', $wrapper);
}
/**
* Upload file from callable.
*

View File

@ -16,19 +16,27 @@
namespace danog\MadelineProto\Ipc\Wrapper;
use AssertionError;
use Revolt\EventLoop;
/**
* @internal
*/
trait ClosableTrait
{
/**
* @var list<Closure(): void>
*/
private array $closeCallbacks = [];
/**
* Closes the resource, marking it as unusable.
* Whether pending operations are aborted or not is implementation dependent.
*/
public function close(): void
{
if ($this->closeCallbacks) {
\array_map(EventLoop::queue(...), $this->closeCallbacks);
$this->closeCallbacks = [];
}
$this->__call('close');
}
@ -49,6 +57,11 @@ trait ClosableTrait
*/
public function onClose(\Closure $onClose): void
{
throw new AssertionError("Not implemented");
$this->closeCallbacks []= $onClose;
}
final public function __destruct()
{
$this->close();
}
}

View File

@ -16,6 +16,7 @@
namespace danog\MadelineProto\Ipc\Wrapper;
use danog\MadelineProto\Ipc\ClientAbstract;
use danog\MadelineProto\Ipc\Wrapper;
/**
@ -30,7 +31,7 @@ abstract class Obj
*
* @param array<string, int> $methods
*/
public function __construct(private Wrapper $wrapper, private array $methods)
public function __construct(private ClientAbstract $wrapper, private array $methods)
{
}
/**

View File

@ -16,11 +16,12 @@
namespace danog\MadelineProto\Ipc\Wrapper;
use Amp\ByteStream\ClosedException;
use Amp\ByteStream\ReadableStream as AmpReadableStream;
use Amp\ByteStream\ReadableStreamIteratorAggregate;
use Amp\Cancellation;
use IteratorAggregate;
use Webmozart\Assert\Assert;
use Revolt\EventLoop;
use function Amp\async;
@ -34,10 +35,25 @@ class ReadableStream extends Obj implements AmpReadableStream, IteratorAggregate
public function read(?Cancellation $cancellation = null): ?string
{
if ($cancellation) {
return async($this->__call(...), 'read')->await($cancellation);
}
return $this->__call('read');
return async(function (): ?string {
$result = null;
try {
$result = $this->__call('read');
} catch (ClosedException $e) {
if ($this->closeCallbacks) {
\array_map(EventLoop::queue(...), $this->closeCallbacks);
$this->closeCallbacks = [];
}
throw $e;
}
if ($result === null) {
if ($this->closeCallbacks) {
\array_map(EventLoop::queue(...), $this->closeCallbacks);
$this->closeCallbacks = [];
}
}
return $result;
})->await($cancellation);
}
/**

View File

@ -300,7 +300,7 @@ final class DjLoop extends VoIPLoop
}
/**
* Get info about the audio currently being played.
*
*
* Will return a string with the object ID of the stream if we're currently playing a stream, otherwise returns the related LocalFile or RemoteUrl.
*/
public function getCurrent(): LocalFile|RemoteUrl|string|null

View File

@ -135,7 +135,7 @@ final class VoIP extends Update implements SimpleFilters
/**
* Get the file that is currently being played.
*
*
* Will return a string with the object ID of the stream if we're currently playing a stream, otherwise returns the related LocalFile or RemoteUrl.
*/
public function getCurrent(): RemoteUrl|LocalFile|string|null

View File

@ -154,6 +154,24 @@ trait AuthKeyHandler
($this->calls[$id] ?? null)?->play($file);
}
/**
* Play file in call, blocking until the file has finished playing if a stream is provided.
*
* @internal
*/
public function callPlayBlocking(int $id, LocalFile|RemoteUrl|ReadableStream $file): void
{
if (!isset($this->calls[$id])) {
return;
}
$this->calls[$id]->play($file);
if ($file instanceof ReadableStream) {
$deferred = new DeferredFuture;
$file->onClose($deferred->complete(...));
$deferred->getFuture()->await();
}
}
/**
* When called, skips to the next file in the playlist.
*/
@ -178,9 +196,29 @@ trait AuthKeyHandler
($this->calls[$id] ?? null)?->playOnHold(...$files);
}
/**
* Play files on hold in call.
*
* @internal
*/
public function callPlayOnHoldBlocking(int $id, LocalFile|RemoteUrl|ReadableStream ...$files): void
{
if (!isset($this->calls[$id])) {
return;
}
$this->calls[$id]->playOnHold(...$files);
foreach ($files as $file) {
if ($file instanceof ReadableStream) {
$deferred = new DeferredFuture;
$file->onClose($deferred->complete(...));
$deferred->getFuture()->await();
}
}
}
/**
* Get the file that is currently being played.
*
*
* Will return a string with the object ID of the stream if we're currently playing a stream, otherwise returns the related LocalFile or RemoteUrl.
*/
public function callGetCurrent(int $id): RemoteUrl|LocalFile|string|null

View File

@ -509,9 +509,9 @@ final class VoIPController
}
}
public function log(string $message): void
public function log(string $message, int $level = Logger::NOTICE): void
{
EventLoop::queue($this->API->logger->logger(...), $message);
EventLoop::queue($this->API->logger->logger(...), $message, $level);
}
private bool $muted = true;
/**