mirror of
https://github.com/danog/MadelineProto.git
synced 2024-11-26 19:24:42 +01:00
Fix IPC issue
This commit is contained in:
parent
c8926ff1dc
commit
049d234284
@ -81,6 +81,31 @@ final class Client extends ClientAbstract
|
||||
self::$instances[$session->getSessionDirectoryPath()] = $this;
|
||||
EventLoop::queue($this->loopInternal(...));
|
||||
}
|
||||
|
||||
/**
|
||||
* Call function.
|
||||
*
|
||||
* @param string|int $function Function name
|
||||
* @param array|Wrapper $arguments Arguments
|
||||
*/
|
||||
public function __call(string|int $function, array|Wrapper $arguments)
|
||||
{
|
||||
if (\is_array($arguments) && $arguments) {
|
||||
foreach ($arguments as &$arg) {
|
||||
if ($arg instanceof Cancellation) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if ($arg instanceof Cancellation) {
|
||||
$wrapper = Wrapper::create($arguments, $this->session, $this->logger);
|
||||
$wrapper->wrap($arg);
|
||||
unset($arg, $arguments);
|
||||
$arguments = $wrapper;
|
||||
}
|
||||
}
|
||||
return parent::__call($function, $arguments);
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
public function getSession(): SessionPaths
|
||||
{
|
||||
|
@ -28,6 +28,7 @@ use danog\MadelineProto\Ipc\Wrapper\Obj;
|
||||
use danog\MadelineProto\Ipc\Wrapper\ReadableStream;
|
||||
use danog\MadelineProto\Ipc\Wrapper\SeekableReadableStream;
|
||||
use danog\MadelineProto\Ipc\Wrapper\SeekableWritableStream;
|
||||
use danog\MadelineProto\Ipc\Wrapper\WrappedCancellation;
|
||||
use danog\MadelineProto\Ipc\Wrapper\WritableStream;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\SessionPaths;
|
||||
@ -108,6 +109,9 @@ final class Wrapper extends ClientAbstract
|
||||
public function wrap(mixed &$callback, bool $wrapObjects = true): void
|
||||
{
|
||||
if (\is_object($callback) && $wrapObjects) {
|
||||
if ($callback instanceof Cancellation) {
|
||||
$callback = new WrappedCancellation($callback);
|
||||
}
|
||||
if ($callback instanceof FileCallbackInterface) {
|
||||
$file = $callback->getFile();
|
||||
if ($file instanceof ByteStreamReadableStream) {
|
||||
@ -128,7 +132,7 @@ final class Wrapper extends ClientAbstract
|
||||
$class = method_exists($callback, 'seek') ? SeekableWritableStream::class : WritableStream::class;
|
||||
} elseif ($callback instanceof FileCallbackInterface) {
|
||||
$class = FileCallback::class;
|
||||
} elseif ($callback instanceof Cancellation) {
|
||||
} elseif ($callback instanceof WrappedCancellation) {
|
||||
$class = WrapperCancellation::class;
|
||||
}
|
||||
if (!$class) {
|
||||
|
@ -18,6 +18,7 @@ namespace danog\MadelineProto\Ipc\Wrapper;
|
||||
|
||||
use Amp\Cancellation as AmpCancellation;
|
||||
use Amp\CancelledException;
|
||||
use Revolt\EventLoop;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
@ -37,7 +38,15 @@ final class Cancellation extends Obj implements AmpCancellation
|
||||
*/
|
||||
public function subscribe(\Closure $callback): string
|
||||
{
|
||||
return $this->__call('unsubscribe', [$callback]);
|
||||
$id = $this->__call('getId');
|
||||
EventLoop::queue(function () use ($id, $callback): void {
|
||||
try {
|
||||
$this->__call('wait', [$id]);
|
||||
} catch (CancelledException $e) {
|
||||
$callback($e);
|
||||
} catch (\Throwable) {}
|
||||
});
|
||||
return $id;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -47,7 +56,7 @@ final class Cancellation extends Obj implements AmpCancellation
|
||||
*/
|
||||
public function unsubscribe(string $id): void
|
||||
{
|
||||
$this->__call('unsubscribe', [$id]);
|
||||
EventLoop::queue($this->__call(...), 'unsubscribe', [$id]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user