1
0
mirror of https://github.com/danog/MadelineProto.git synced 2025-01-22 16:31:11 +01:00

Improve API

This commit is contained in:
Daniil Gentili 2023-08-14 17:16:59 +02:00
parent 23702168b1
commit 4b9ade0fea
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
7 changed files with 133 additions and 103 deletions

View File

@ -33,6 +33,7 @@ use danog\MadelineProto\EventHandler\Message\Service\DialogPhotoChanged;
use danog\MadelineProto\EventHandler\SimpleFilter\FromAdmin;
use danog\MadelineProto\EventHandler\SimpleFilter\Incoming;
use danog\MadelineProto\EventHandler\SimpleFilter\IsReply;
use danog\MadelineProto\LocalFile;
use danog\MadelineProto\Logger;
use danog\MadelineProto\ParseMode;
use danog\MadelineProto\Settings;
@ -295,13 +296,13 @@ class MyEventHandler extends SimpleEventHandler
#[FilterCommand('call')]
public function callVoip(Incoming&Message $message): void
{
$this->requestCall($message->senderId)->play(__DIR__.'/../music.ogg');
$this->requestCall($message->senderId)->play(new LocalFile(__DIR__.'/../music.ogg'));
}
#[Handler]
public function handleIncomingCall(VoIP&Incoming $call): void
{
$call->accept()->play(__DIR__.'/../music.ogg');
$call->accept()->play(new LocalFile(__DIR__.'/../music.ogg'));
}
public static function getPluginPaths(): string|array|null

View File

@ -1121,6 +1121,7 @@
<file src="src/MTProto.php">
<DocblockTypeContradiction>
<code><![CDATA[\is_object($this->datacenter)]]></code>
<code><![CDATA[isset($this->TL)]]></code>
<code><![CDATA[isset($this->settings)]]></code>
<code><![CDATA[isset($this->settings)]]></code>
<code><![CDATA[new DataCenter($this, $this->dcList, $this->settings->getConnection())]]></code>
@ -1186,7 +1187,6 @@
</PossiblyInvalidArgument>
<PossiblyInvalidArrayAccess>
<code><![CDATA[$this->getSelf()['bot']]]></code>
<code><![CDATA[$this->getSelf()['id']]]></code>
<code><![CDATA[$this->getSelf()['premium']]]></code>
</PossiblyInvalidArrayAccess>
<PossiblyNullArrayAccess>
@ -1228,7 +1228,6 @@
</RedundantConditionGivenDocblockType>
<UndefinedThisPropertyFetch>
<code><![CDATA[$this->full_chats]]></code>
<code><![CDATA[$this->usernames]]></code>
</UndefinedThisPropertyFetch>
<UnsupportedReferenceUsage>
<code><![CDATA[Lang::$current_lang =& Lang::$lang[$this->settings->getAppInfo()->getLangCode()]]]></code>
@ -1456,7 +1455,6 @@
</PossiblyNullArgument>
<PossiblyUndefinedArrayOffset>
<code><![CDATA[$_SERVER['REQUEST_METHOD']]]></code>
<code><![CDATA[$_SERVER['REQUEST_METHOD']]]></code>
</PossiblyUndefinedArrayOffset>
<PossiblyUndefinedMethod>
<code>read</code>

View File

@ -1313,6 +1313,15 @@ abstract class InternalDoc
{
return \danog\MadelineProto\StrTools::mbSubstr($text, $offset, $length);
}
/**
* Provide a buffered reader for a file, URL or amp stream.
*
* @return Closure(int, ?Cancellation): ?string
*/
public static function openBuffered(\danog\MadelineProto\LocalFile|\danog\MadelineProto\RemoteUrl|\Amp\ByteStream\ReadableStream $stream): \Closure
{
return \danog\MadelineProto\Tools::openBuffered($stream);
}
/**
* Opens a file in append-only mode.
*
@ -1481,7 +1490,7 @@ abstract class InternalDoc
*
* @param mixed $user User
*/
public function requestCall(mixed $user)
public function requestCall(mixed $user): \danog\MadelineProto\VoIP
{
return $this->wrapper->getAPI()->requestCall($user);
}

View File

@ -20,14 +20,8 @@ namespace danog\MadelineProto;
use Amp\ByteStream\ReadableStream;
use Amp\ByteStream\WritableStream;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use AssertionError;
use danog\MadelineProto\Stream\BufferedStreamInterface;
use danog\MadelineProto\Stream\BufferInterface;
use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\Transport\PremadeStream;
use Closure;
use FFI;
use FFI\CData;
use Webmozart\Assert\Assert;
@ -204,11 +198,6 @@ final class Ogg
*/
private int $streamCount;
/**
* Buffered stream interface.
*/
private BufferInterface $stream;
/**
* Pack format.
*/
@ -222,13 +211,15 @@ final class Ogg
public readonly iterable $opusPackets;
/**
* Constructor.
*
* @param BufferedStreamInterface $stream The stream
* @var (Closure(int, ?Cancellation): ?string) $stream The stream
*/
public function __construct(BufferedStreamInterface $stream)
private readonly Closure $stream;
/**
* Constructor.
*/
public function __construct(LocalFile|RemoteUrl|ReadableStream $stream)
{
$this->stream = $stream->getReadBuffer($l);
$this->stream = Tools::openBuffered($stream);
$pack_format = [
'stream_structure_version' => 'C',
'header_type_flag' => 'C',
@ -355,6 +346,9 @@ final class Ogg
/** @psalm-suppress InvalidArgument */
$this->opusPayload .= \substr($content, $preOffset, (int) (($offset - $preOffset) + $sum + $paddingLen));
if ($this->currentDuration === 60_000) {
if (($s = \strlen($this->opusPayload)) > 1024) {
throw new AssertionError("Encountered a packet with size $s > 1024, please convert the audio files using Ogg::convert to avoid issues with packet size!");
}
yield $this->opusPayload;
$this->opusPayload = '';
$this->currentDuration = 0;
@ -376,6 +370,9 @@ final class Ogg
if (\strlen($this->opusPayload) !== \strlen($content)) {
throw new AssertionError();
}
if (($s = \strlen($this->opusPayload)) > 1024) {
throw new AssertionError("Encountered a packet with size $s > 1024, please convert the audio files using Ogg::convert to avoid issues with packet size!");
}
yield $this->opusPayload;
$this->opusPayload = '';
$this->currentDuration = 0;
@ -385,6 +382,14 @@ final class Ogg
}
}
/**
* Validate that the specified file, URL or stream is a valid VoIP OGG OPUS file.
*/
public function validate(LocalFile|RemoteUrl|ReadableStream $file): void
{
foreach ((new self($file))->opusPackets as $_) {
}
}
/**
* Read frames.
*
@ -398,7 +403,7 @@ final class Ogg
$ignoredStreams = [];
while (true) {
$capture = $this->stream->bufferRead(4);
$capture = ($this->stream)(4);
if ($capture !== self::CAPTURE_PATTERN) {
if ($capture === null) {
return;
@ -408,7 +413,7 @@ final class Ogg
$headers = \unpack(
$this->packFormat,
$this->stream->bufferRead(23)
($this->stream)(23)
);
$ignore = \in_array($headers['bitstream_serial_number'], $ignoredStreams, true);
@ -424,7 +429,7 @@ final class Ogg
$segments = \unpack(
'C*',
$this->stream->bufferRead($headers['number_page_segments']),
($this->stream)($headers['number_page_segments']),
);
//$serial = $headers['bitstream_serial_number'];
@ -439,7 +444,7 @@ final class Ogg
foreach ($segments as $segment_size) {
$sizeAccumulated += $segment_size;
if ($segment_size < 255) {
$piece = $this->stream->bufferRead($sizeAccumulated);
$piece = ($this->stream)($sizeAccumulated);
$sizeAccumulated = 0;
if ($ignore) {
continue;
@ -539,33 +544,24 @@ final class Ogg
$checkErr($opus->opus_encoder_ctl($encoder, self::OPUS_SET_BANDWIDTH_REQUEST, self::OPUS_BANDWIDTH_FULLBAND));
$checkErr($opus->opus_encoder_ctl($encoder, self::OPUS_SET_BITRATE_REQUEST, 130*1000));
$in = $wavIn instanceof LocalFile
? openFile($wavIn->file, 'r')
: (
$wavIn instanceof RemoteUrl
? HttpClientBuilder::buildDefault()->request(new Request($wavIn->url))->getBody()
: $wavIn
);
$read = Tools::openBuffered($wavIn);
$ctx = (new ConnectionContext())->addStream(PremadeStream::class, $in)->addStream(SimpleBufferedRawStream::class);
/** @var SimpleBufferedRawStream */
$in = $ctx->getStream();
Assert::eq($in->bufferRead(length: 4), 'RIFF', "A .wav file must be provided!");
$totalLength = \unpack('V', $in->bufferRead(length: 4))[1];
Assert::eq($in->bufferRead(length: 4), 'WAVE', "A .wav file must be provided!");
Assert::eq($read(4), 'RIFF', "A .wav file must be provided!");
$totalLength = \unpack('V', $read(4))[1];
Assert::eq($read(4), 'WAVE', "A .wav file must be provided!");
do {
$type = $in->bufferRead(length: 4);
$length = \unpack('V', $in->bufferRead(length: 4))[1];
$type = $read(4);
$length = \unpack('V', $read(4))[1];
if ($type === 'fmt ') {
Assert::eq($length, 16);
$contents = $in->bufferRead(length: $length + ($length % 2));
$contents = $read($length + ($length % 2));
$header = \unpack('vaudioFormat/vchannels/VsampleRate/VbyteRate/vblockAlign/vbitsPerSample', $contents);
Assert::eq($header['audioFormat'], 1, "The wav file must contain PCM audio");
Assert::eq($header['sampleRate'], 48000, "The sample rate of the wav file must be 48khz!");
} elseif ($type === 'data') {
break;
} else {
$in->bufferRead($length);
$read($length);
}
} while (true);
@ -636,6 +632,7 @@ final class Ogg
$writeTag("MadelineProto ".API::RELEASE.", ".$opus->opus_get_version_string());
$tags .= \pack('V', 2);
$writeTag("ENCODER=MadelineProto ".API::RELEASE." with ".$opus->opus_get_version_string());
$writeTag("MADELINE_ENCODER_V=1");
$writeTag('See https://docs.madelineproto.xyz/docs/CALLS.html for more info');
$writePage(
0,
@ -648,7 +645,7 @@ final class Ogg
$granule = 0;
$buf = FFI::cast(FFI::type('char*'), FFI::addr($opus->new('char[1024]')));
do {
$chunkOrig = $in->bufferRead(length: $chunkSize);
$chunkOrig = $read($chunkSize);
$chunk = \str_pad($chunkOrig, $chunkSize, "\0");
$granuleDiff = \strlen($chunk) >> $shift;
$len = $opus->opus_encode($encoder, $chunk, $granuleDiff, $buf, 1024);

View File

@ -21,7 +21,11 @@ declare(strict_types=1);
namespace danog\MadelineProto;
use Amp\ByteStream\ReadableBuffer;
use Amp\ByteStream\ReadableStream;
use Amp\Cancellation;
use Amp\File\File;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use ArrayAccess;
use Closure;
use Countable;
@ -600,6 +604,41 @@ abstract class Tools extends AsyncTools
return openFile($path, "a");
}
/**
* Provide a buffered reader for a file, URL or amp stream.
*
* @return Closure(int, ?Cancellation): ?string
*/
public static function openBuffered(LocalFile|RemoteUrl|ReadableStream $stream): Closure
{
if ($stream instanceof LocalFile) {
$stream = openFile($stream->file, 'r');
return fn (int $len, ?Cancellation $cancellation = null): ?string => $stream->read(cancellation: $cancellation, length: $len);
}
if ($stream instanceof RemoteUrl) {
$stream = HttpClientBuilder::buildDefault()->request(new Request($stream->url))->getBody();
}
$buffer = '';
return function (int $len, ?Cancellation $cancellation = null) use (&$buffer, $stream): ?string {
if ($buffer === null) {
return null;
}
do {
if (\strlen($buffer) >= $len) {
$piece = \substr($buffer, 0, $len);
$buffer = \substr($buffer, $len);
return $piece;
}
$chunk = $stream->read($cancellation);
if ($chunk === null) {
$buffer = null;
return null;
}
$buffer .= $chunk;
} while (true);
};
}
private const BLOCKING_FUNCTIONS = [
'file_get_contents' => 'https://github.com/amphp/file, https://github.com/amphp/http-client or $this->fileGetContents()',
'file_put_contents' => 'https://github.com/amphp/file',

View File

@ -55,7 +55,7 @@ trait AuthKeyHandler
*
* @param mixed $user User
*/
public function requestCall(mixed $user)
public function requestCall(mixed $user): VoIP
{
$user = ($this->getInfo($user));
if (!isset($user['InputUser']) || $user['InputUser']['_'] === 'inputUserSelf') {

View File

@ -19,12 +19,7 @@
namespace danog\MadelineProto;
use Amp\ByteStream\ReadableStream;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use AssertionError;
use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\Stream\Common\FileBufferedStream;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\VoIP\CallState;
use danog\MadelineProto\VoIP\DiscardReason;
use danog\MadelineProto\VoIP\Endpoint;
@ -37,7 +32,6 @@ use Throwable;
use Webmozart\Assert\Assert;
use function Amp\delay;
use function Amp\File\openFile;
/** @internal */
final class VoIPController
@ -104,9 +98,9 @@ final class VoIPController
private array $call;
/** @var array<string|LocalFile|RemoteUrl|ReadableStream> */
/** @var array<LocalFile|RemoteUrl|ReadableStream> */
private array $holdFiles = [];
/** @var list<string|LocalFile|RemoteUrl|ReadableStream> */
/** @var list<LocalFile|RemoteUrl|ReadableStream> */
private array $inputFiles = [];
private int $holdIndex = 0;
@ -468,26 +462,25 @@ final class VoIPController
}
private bool $muted = false;
private bool $playingHold = false;
private function pullPacket(): bool
private function pullPacket(): ?string
{
$file = \array_shift($this->inputFiles);
if ($file) {
$this->playingHold = false;
} else {
$this->playingHold = true;
if (!$this->holdFiles) {
return false;
if ($this->packetQueue->isEmpty()) {
$file = \array_shift($this->inputFiles);
if ($file) {
$this->playingHold = false;
} else {
$this->playingHold = true;
if (!$this->holdFiles) {
return null;
}
$file = $this->holdFiles[($this->holdIndex++) % \count($this->holdFiles)];
}
$file = $this->holdFiles[($this->holdIndex++) % \count($this->holdFiles)];
}
$it = $this->openFile($file);
foreach ($it->opusPackets as $packet) {
if (($s = \strlen($packet)) > 1024) {
throw new AssertionError("Encountered a packet with size $s > 1024, please convert the audio files using Ogg::convert to avoid issues with packet size!");
$it = new Ogg($file);
foreach ($it->opusPackets as $packet) {
$this->packetQueue->enqueue($packet);
}
$this->packetQueue->enqueue($packet);
}
return false;
return $this->packetQueue->dequeue();
}
/**
* Start write loop.
@ -506,26 +499,15 @@ final class VoIPController
$delay = $this->muted ? 0.2 : 0.06;
$t = \microtime(true) + $delay;
while (true) {
if ($this->packetQueue->isEmpty() && !$this->pullPacket()) {
if (!$this->muted) {
$this->bestEndpoint->writeReliably([
'_' => self::PKT_STREAM_STATE,
'id' => 0,
'enabled' => false
]);
$this->muted = true;
$delay = 0.2;
}
$packet = $this->messageHandler->encryptPacket([
'_' => self::PKT_NOP
]);
} else {
if ($packet = $this->pullPacket()) {
if ($this->muted) {
$this->bestEndpoint->writeReliably([
if (!$this->bestEndpoint->writeReliably([
'_' => self::PKT_STREAM_STATE,
'id' => 0,
'enabled' => true
]);
])) {
return;
}
$this->muted = false;
$delay = 0.06;
$this->opusTimestamp = 0;
@ -533,19 +515,38 @@ final class VoIPController
$packet = $this->messageHandler->encryptPacket([
'_' => self::PKT_STREAM_DATA,
'stream_id' => 0,
'data' => $this->packetQueue->dequeue(),
'data' => $packet,
'timestamp' => $this->opusTimestamp
]);
$this->opusTimestamp += 60;
} else {
if (!$this->muted) {
if (!$this->bestEndpoint->writeReliably([
'_' => self::PKT_STREAM_STATE,
'id' => 0,
'enabled' => false
])) {
return;
}
$this->muted = true;
$delay = 0.2;
}
$packet = $this->messageHandler->encryptPacket([
'_' => self::PKT_NOP
]);
}
//Logger::log("Writing {$this->opusTimestamp} in $this!");
$cur = \microtime(true);
$diff = $t - $cur;
if ($diff > 0) {
delay($diff);
} else {
EventLoop::queue(Logger::log(...), "We're late while sending audio data!");
}
$this->bestEndpoint->write($packet);
if (!$this->bestEndpoint->write($packet)) {
return;
}
if ($diff > 0) {
$cur += $diff;
@ -555,21 +556,6 @@ final class VoIPController
$t += $delay;
}
}
/**
* Open OGG file for reading.
*/
private function openFile(string|LocalFile|RemoteUrl|ReadableStream $file): Ogg
{
$ctx = new ConnectionContext;
$ctx->addStream(FileBufferedStream::class, match (true) {
\is_string($file) => openFile($file, 'r'),
$file instanceof LocalFile => openFile($file->file, 'r'),
$file instanceof RemoteUrl => HttpClientBuilder::buildDefault()->request(new Request($file->url))->getBody(),
$file instanceof ReadableStream => $file,
});
$stream = $ctx->getStream();
return new Ogg($stream);
}
/**
* Play file.
*/