1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-27 04:24:45 +01:00

Add listen support

This commit is contained in:
Aaron Piotrowski 2016-09-19 11:12:32 -05:00
parent 6d0a380464
commit 23129d66b5
8 changed files with 225 additions and 11 deletions

28
example/listen.php Normal file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Postgres;
Amp\execute(function () {
/** @var \Amp\Postgres\Connection $connection */
$connection = yield Postgres\connect('host=localhost user=postgres');
/** @var \Amp\Postgres\Listener $listener */
$listener = yield $connection->listen("test");
yield $connection->query("NOTIFY test, 'Data 1'");
yield $connection->query("NOTIFY test, 'Data 2'");
while (yield $listener->next()) {
/** @var \Amp\Postgres\Notification $notification */
$notification = $listener->getCurrent();
\printf(
"Received notification from PID %d on channel %s with payload: %s\n",
$notification->pid,
$notification->channel,
$notification->payload
);
}
});

View File

@ -75,7 +75,14 @@ abstract class AbstractConnection implements Connection {
* {@inheritdoc} * {@inheritdoc}
*/ */
public function prepare(string $sql): Awaitable { public function prepare(string $sql): Awaitable {
return new Coroutine($this->send([$this->executor, "prepare"], $sql, $sql)); return new Coroutine($this->send([$this->executor, "prepare"], $sql));
}
/**
* {@inheritdoc}
*/
public function listen(string $channel): Awaitable {
return new Coroutine($this->send([$this->executor, "listen"], $channel));
} }
/** /**

View File

@ -192,6 +192,27 @@ abstract class AbstractPool implements Pool {
return $statement; return $statement;
} }
/**
* {@inheritdoc}
*/
public function listen(string $channel): Awaitable {
return new Coroutine($this->doListen($channel));
}
public function doListen(string $channel): \Generator {
/** @var \Amp\Postgres\Connection $connection */
$connection = yield from $this->pop();
try {
/** @var \Amp\Postgres\Statement $statement */
$listener = yield $connection->listen($channel);
} finally {
$this->push($connection);
}
return $listener;
}
/** /**
* {@inheritdoc} * {@inheritdoc}

View File

@ -13,4 +13,13 @@ interface Connection extends Executor {
* @throws \Amp\Postgres\FailureException * @throws \Amp\Postgres\FailureException
*/ */
public function transaction(int $isolation = Transaction::COMMITTED): Awaitable; public function transaction(int $isolation = Transaction::COMMITTED): Awaitable;
/**
* @param string $channel Channel name.
*
* @return \Interop\Async\Awaitable<\Amp\Postgres\Listener>
*
* @throws \Amp\Postgres\FailureException
*/
public function listen(string $channel): Awaitable;
} }

34
lib/Listener.php Normal file
View File

@ -0,0 +1,34 @@
<?php declare(strict_types = 1);
namespace Amp\Postgres;
use Amp\{ Observable, Observer };
use Interop\Async\Awaitable;
class Listener extends Observer {
/** @var string */
private $channel;
/** @var callable */
private $unlisten;
/**
* @param \Amp\Observable $observable Observable emitting notificatons on the channel.
* @param string $channel Channel name.
* @param callable(string $channel): void $unlisten Function invoked to unlisten from the channel.
*/
public function __construct(Observable $observable, string $channel, callable $unlisten) {
parent::__construct($observable);
$this->channel = $channel;
$this->unlisten = $unlisten;
}
/**
* Unlistens from the channel. No more values will be emitted on theis channel.
*
* @return \Interop\Async\Awaitable<\Amp\Postgres\CommandResult>
*/
public function unlisten(): Awaitable {
return ($this->unlisten)($this->channel);
}
}

18
lib/Notification.php Normal file
View File

@ -0,0 +1,18 @@
<?php declare(strict_types = 1);
namespace Amp\Postgres;
use Amp\Struct;
class Notification {
use Struct;
/** @var string Channel name. */
public $channel;
/** @var int PID of message source. */
public $pid;
/** @var string Message paypload */
public $payload;
}

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres; namespace Amp\Postgres;
use Amp\{ CallableMaker, Coroutine, Deferred, function pipe }; use Amp\{ CallableMaker, Coroutine, Deferred, Postponed, function pipe };
use Interop\Async\{ Awaitable, Loop }; use Interop\Async\{ Awaitable, Loop };
class PgSqlExecutor implements Executor { class PgSqlExecutor implements Executor {
@ -26,6 +26,12 @@ class PgSqlExecutor implements Executor {
/** @var callable */ /** @var callable */
private $createResult; private $createResult;
/** @var \Amp\Postponed[] */
private $listeners = [];
/** @var callable */
private $unlisten;
/** /**
* Connection constructor. * Connection constructor.
* *
@ -36,21 +42,30 @@ class PgSqlExecutor implements Executor {
$this->handle = $handle; $this->handle = $handle;
$deferred = &$this->delayed; $deferred = &$this->delayed;
$listeners = &$this->listeners;
$this->poll = Loop::onReadable($socket, static function ($watcher) use (&$deferred, $handle) { $this->poll = Loop::onReadable($socket, static function ($watcher) use (&$deferred, &$listeners, $handle) {
if (!\pg_consume_input($handle)) { if (!\pg_consume_input($handle)) {
Loop::disable($watcher); Loop::disable($watcher);
$deferred->fail(new FailureException(\pg_last_error($handle))); $deferred->fail(new FailureException(\pg_last_error($handle)));
return; return;
} }
while ($result = \pg_get_notify($handle)) {
$channel = $result['message'];
if (isset($listeners[$channel])) {
$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $result['pid'];
$notification->payload = $result['payload'];
$listeners[$channel]->emit($notification);
}
}
if (!\pg_connection_busy($handle)) { if (!\pg_connection_busy($handle)) {
Loop::disable($watcher); Loop::disable($watcher);
$deferred->resolve(\pg_get_result($handle)); $deferred->resolve(\pg_get_result($handle));
return;
} }
// Reading not done, listen again.
}); });
$this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, $handle) { $this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, $handle) {
@ -71,6 +86,7 @@ class PgSqlExecutor implements Executor {
$this->createResult = $this->callableFromInstanceMethod("createResult"); $this->createResult = $this->callableFromInstanceMethod("createResult");
$this->executeCallback = $this->callableFromInstanceMethod("sendExecute"); $this->executeCallback = $this->callableFromInstanceMethod("sendExecute");
$this->unlisten = $this->callableFromInstanceMethod("unlisten");
} }
/** /**
@ -187,4 +203,37 @@ class PgSqlExecutor implements Executor {
return new PgSqlStatement($sql, $this->executeCallback); return new PgSqlStatement($sql, $this->executeCallback);
}); });
} }
/**
* {@inheritdoc}
*/
public function listen(string $channel): Awaitable {
return pipe($this->query(\sprintf("LISTEN %s", $channel)), function (CommandResult $result) use ($channel) {
$postponed = new Postponed;
$this->listeners[$channel] = $postponed;
return new Listener($postponed->getObservable(), $channel, $this->unlisten);
});
}
/**
* @param string $channel
*
* @return \Interop\Async\Awaitable
*
* @throws \Error
*/
private function unlisten(string $channel): Awaitable {
if (!isset($this->listeners[$channel])) {
throw new \Error("Not listening on that channel");
}
$postponed = $this->listeners[$channel];
unset($this->listeners[$channel]);
$awaitable = $this->query(\sprintf("UNLISTEN %s", $channel));
$awaitable->when(function () use ($postponed) {
$postponed->resolve();
});
return $awaitable;
}
} }

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres; namespace Amp\Postgres;
use Amp\{ CallableMaker, Coroutine, Deferred, function pipe }; use Amp\{ CallableMaker, Coroutine, Deferred, Postponed, function pipe };
use Interop\Async\{ Awaitable, Loop }; use Interop\Async\{ Awaitable, Loop };
use pq; use pq;
@ -23,6 +23,9 @@ class PqExecutor implements Executor {
/** @var string */ /** @var string */
private $await; private $await;
/** @var \Amp\Postponed[] */
private $listeners;
/** @var callable */ /** @var callable */
private $send; private $send;
@ -30,6 +33,9 @@ class PqExecutor implements Executor {
/** @var callable */ /** @var callable */
private $fetch; private $fetch;
/** @var callable */
private $unlisten;
/** @var callable */ /** @var callable */
private $release; private $release;
@ -51,10 +57,7 @@ class PqExecutor implements Executor {
if (!$handle->busy) { if (!$handle->busy) {
$deferred->resolve($handle->getResult()); $deferred->resolve($handle->getResult());
return;
} }
// Reading not done, listen again.
}); });
$this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) { $this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) {
@ -70,6 +73,7 @@ class PqExecutor implements Executor {
$this->send = $this->callableFromInstanceMethod("send"); $this->send = $this->callableFromInstanceMethod("send");
$this->fetch = $this->callableFromInstanceMethod("fetch"); $this->fetch = $this->callableFromInstanceMethod("fetch");
$this->unlisten = $this->callableFromInstanceMethod("unlisten");
$this->release = $this->callableFromInstanceMethod("release"); $this->release = $this->callableFromInstanceMethod("release");
} }
@ -176,7 +180,7 @@ class PqExecutor implements Executor {
} }
switch ($result->status) { switch ($result->status) {
case pq\Result::TUPLES_OK: // No more rows in result set. case pq\Result::TUPLES_OK: // End of result set.
return null; return null;
case pq\Result::SINGLE_TUPLE: case pq\Result::SINGLE_TUPLE:
@ -213,4 +217,48 @@ class PqExecutor implements Executor {
public function prepare(string $sql): Awaitable { public function prepare(string $sql): Awaitable {
return new Coroutine($this->send([$this->handle, "prepareAsync"], $sql, $sql)); return new Coroutine($this->send([$this->handle, "prepareAsync"], $sql, $sql));
} }
/**
* {@inheritdoc}
*/
public function listen(string $channel): Awaitable {
$postponed = new Postponed;
$awaitable = new Coroutine($this->send(
[$this->handle, "listenAsync"],
$channel,
static function (string $channel, string $message, int $pid) use ($postponed) {
$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $pid;
$notification->payload = $message;
$postponed->emit($notification);
}));
return pipe($awaitable, function () use ($postponed, $channel) {
$this->listeners[$channel] = $postponed;
return new Listener($postponed->getObservable(), $channel, $this->unlisten);
});
}
/**
* @param string $channel
*
* @return \Interop\Async\Awaitable
*
* @throws \Error
*/
private function unlisten(string $channel): Awaitable {
if (!isset($this->listeners[$channel])) {
throw new \Error("Not listening on that channel");
}
$postponed = $this->listeners[$channel];
unset($this->listeners[$channel]);
$awaitable = new Coroutine($this->send([$this->handle, "unlistenAsync"], $channel));
$awaitable->when(function () use ($postponed) {
$postponed->resolve();
});
return $awaitable;
}
} }