1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-26 20:15:02 +01:00

Error on multiple calls to listen on same channel

This commit is contained in:
Aaron Piotrowski 2017-06-13 00:34:20 -05:00
parent b72bd95bae
commit fe659ea5e6
3 changed files with 97 additions and 17 deletions

View File

@ -211,7 +211,7 @@ class PgSqlExecutor implements Executor {
*/ */
public function notify(string $channel, string $payload = ""): Promise { public function notify(string $channel, string $payload = ""): Promise {
if ($payload === "") { if ($payload === "") {
return $this->query(\sprintf("NOTIFY %s")); return $this->query(\sprintf("NOTIFY %s", $channel));
} }
return $this->query(\sprintf("NOTIFY %s, '%s'", $channel, $payload)); return $this->query(\sprintf("NOTIFY %s, '%s'", $channel, $payload));
@ -222,10 +222,19 @@ class PgSqlExecutor implements Executor {
*/ */
public function listen(string $channel): Promise { public function listen(string $channel): Promise {
return call(function () use ($channel) { return call(function () use ($channel) {
yield $this->query(\sprintf("LISTEN %s")); if (isset($this->listeners[$channel])) {
throw new QueryError(\sprintf("Already listening on channel '%s'", $channel));
}
$this->listeners[$channel] = $emitter = new Emitter;
try {
yield $this->query(\sprintf("LISTEN %s", $channel));
} catch (\Throwable $exception) {
unset($this->listeners[$channel]);
throw $exception;
}
$emitter = new Emitter;
$this->listeners[$channel] = $emitter;
Loop::enable($this->poll); Loop::enable($this->poll);
return new Listener($emitter->iterate(), $channel, $this->unlisten); return new Listener($emitter->iterate(), $channel, $this->unlisten);
}); });

View File

@ -230,19 +230,28 @@ class PqExecutor implements Executor {
*/ */
public function listen(string $channel): Promise { public function listen(string $channel): Promise {
return call(function () use ($channel) { return call(function () use ($channel) {
$emitter = new Emitter; if (isset($this->listeners[$channel])) {
yield from $this->send( throw new QueryError(\sprintf("Already listening on channel '%s'", $channel));
[$this->handle, "listenAsync"], }
$channel,
static function (string $channel, string $message, int $pid) use ($emitter) { $this->listeners[$channel] = $emitter = new Emitter;
$notification = new Notification;
$notification->channel = $channel; try {
$notification->pid = $pid; yield from $this->send(
$notification->payload = $message; [$this->handle, "listenAsync"],
$emitter->emit($notification); $channel,
}); static function (string $channel, string $message, int $pid) use ($emitter) {
$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $pid;
$notification->payload = $message;
$emitter->emit($notification);
});
} catch (\Throwable $exception) {
unset($this->listeners[$channel]);
throw $exception;
}
$this->listeners[$channel] = $emitter;
Loop::enable($this->poll); Loop::enable($this->poll);
return new Listener($emitter->iterate(), $channel, $this->unlisten); return new Listener($emitter->iterate(), $channel, $this->unlisten);
}); });

View File

@ -4,7 +4,9 @@ namespace Amp\Postgres\Test;
use Amp\CancellationTokenSource; use Amp\CancellationTokenSource;
use Amp\Loop; use Amp\Loop;
use Amp\Postgres\{ CommandResult, Connection, Transaction, TransactionError, TupleResult }; use Amp\Postgres\{
CommandResult, Connection, Listener, Transaction, TransactionError, TupleResult
};
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
abstract class AbstractConnectionTest extends TestCase { abstract class AbstractConnectionTest extends TestCase {
@ -176,6 +178,66 @@ abstract class AbstractConnectionTest extends TestCase {
}); });
} }
public function testListen() {
Loop::run(function () {
$channel = "test";
/** @var \Amp\Postgres\Listener $listener */
$listener = yield $this->connection->listen($channel);
$this->assertInstanceOf(Listener::class, $listener);
yield $this->connection->query(\sprintf("NOTIFY %s, '%s'", $channel, '0'));
yield $this->connection->query(\sprintf("NOTIFY %s, '%s'", $channel, '1'));
$count = 0;
Loop::defer(function () use (&$count, $listener) {
$listener->unlisten();
$this->assertSame(2, $count);
});
while (yield $listener->advance()) {
$this->assertSame($listener->getCurrent()->payload, (string) $count++);
}
});
}
/**
* @depends testListen
*/
public function testNotify() {
Loop::run(function () {
$channel = "test";
/** @var \Amp\Postgres\Listener $listener */
$listener = yield $this->connection->listen($channel);
yield $this->connection->notify($channel, '0');
yield $this->connection->notify($channel, '1');
$count = 0;
Loop::defer(function () use (&$count, $listener) {
$listener->unlisten();
$this->assertSame(2, $count);
});
while (yield $listener->advance()) {
$this->assertSame($listener->getCurrent()->payload, (string) $count++);
}
});
}
/**
* @depends testListen
* @expectedException \Amp\Postgres\QueryError
* @expectedExceptionMessage Already listening on channel
*/
public function testListenOnSameChannel() {
Loop::run(function () {
$channel = "test";
$listener = yield $this->connection->listen($channel);
$listener = yield $this->connection->listen($channel);
});
}
public function testConnect() { public function testConnect() {
Loop::run(function () { Loop::run(function () {
$connect = $this->getConnectCallable(); $connect = $this->getConnectCallable();