From 23129d66b5f725b3639aeb3f5173de920f87b6ae Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 19 Sep 2016 11:12:32 -0500 Subject: [PATCH] Add listen support --- example/listen.php | 28 ++++++++++++++++++ lib/AbstractConnection.php | 9 +++++- lib/AbstractPool.php | 21 ++++++++++++++ lib/Connection.php | 9 ++++++ lib/Listener.php | 34 ++++++++++++++++++++++ lib/Notification.php | 18 ++++++++++++ lib/PgSqlExecutor.php | 59 ++++++++++++++++++++++++++++++++++---- lib/PqExecutor.php | 58 +++++++++++++++++++++++++++++++++---- 8 files changed, 225 insertions(+), 11 deletions(-) create mode 100644 example/listen.php create mode 100644 lib/Listener.php create mode 100644 lib/Notification.php diff --git a/example/listen.php b/example/listen.php new file mode 100644 index 0000000..1bf3786 --- /dev/null +++ b/example/listen.php @@ -0,0 +1,28 @@ +#!/usr/bin/env php +listen("test"); + + yield $connection->query("NOTIFY test, 'Data 1'"); + yield $connection->query("NOTIFY test, 'Data 2'"); + + while (yield $listener->next()) { + /** @var \Amp\Postgres\Notification $notification */ + $notification = $listener->getCurrent(); + \printf( + "Received notification from PID %d on channel %s with payload: %s\n", + $notification->pid, + $notification->channel, + $notification->payload + ); + } +}); diff --git a/lib/AbstractConnection.php b/lib/AbstractConnection.php index 42501a0..7fde656 100644 --- a/lib/AbstractConnection.php +++ b/lib/AbstractConnection.php @@ -75,7 +75,14 @@ abstract class AbstractConnection implements Connection { * {@inheritdoc} */ public function prepare(string $sql): Awaitable { - return new Coroutine($this->send([$this->executor, "prepare"], $sql, $sql)); + return new Coroutine($this->send([$this->executor, "prepare"], $sql)); + } + + /** + * {@inheritdoc} + */ + public function listen(string $channel): Awaitable { + return new Coroutine($this->send([$this->executor, "listen"], $channel)); } /** diff --git a/lib/AbstractPool.php b/lib/AbstractPool.php index 32075e5..192c4ff 100644 --- a/lib/AbstractPool.php +++ b/lib/AbstractPool.php @@ -192,6 +192,27 @@ abstract class AbstractPool implements Pool { return $statement; } + + /** + * {@inheritdoc} + */ + public function listen(string $channel): Awaitable { + return new Coroutine($this->doListen($channel)); + } + + public function doListen(string $channel): \Generator { + /** @var \Amp\Postgres\Connection $connection */ + $connection = yield from $this->pop(); + + try { + /** @var \Amp\Postgres\Statement $statement */ + $listener = yield $connection->listen($channel); + } finally { + $this->push($connection); + } + + return $listener; + } /** * {@inheritdoc} diff --git a/lib/Connection.php b/lib/Connection.php index 5ed7873..b9a79ce 100644 --- a/lib/Connection.php +++ b/lib/Connection.php @@ -13,4 +13,13 @@ interface Connection extends Executor { * @throws \Amp\Postgres\FailureException */ public function transaction(int $isolation = Transaction::COMMITTED): Awaitable; + + /** + * @param string $channel Channel name. + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\Listener> + * + * @throws \Amp\Postgres\FailureException + */ + public function listen(string $channel): Awaitable; } diff --git a/lib/Listener.php b/lib/Listener.php new file mode 100644 index 0000000..e978c3d --- /dev/null +++ b/lib/Listener.php @@ -0,0 +1,34 @@ +channel = $channel; + $this->unlisten = $unlisten; + } + + /** + * Unlistens from the channel. No more values will be emitted on theis channel. + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\CommandResult> + */ + public function unlisten(): Awaitable { + return ($this->unlisten)($this->channel); + } +} diff --git a/lib/Notification.php b/lib/Notification.php new file mode 100644 index 0000000..22a3305 --- /dev/null +++ b/lib/Notification.php @@ -0,0 +1,18 @@ +handle = $handle; $deferred = &$this->delayed; + $listeners = &$this->listeners; - $this->poll = Loop::onReadable($socket, static function ($watcher) use (&$deferred, $handle) { + $this->poll = Loop::onReadable($socket, static function ($watcher) use (&$deferred, &$listeners, $handle) { if (!\pg_consume_input($handle)) { Loop::disable($watcher); $deferred->fail(new FailureException(\pg_last_error($handle))); return; } + + while ($result = \pg_get_notify($handle)) { + $channel = $result['message']; + if (isset($listeners[$channel])) { + $notification = new Notification; + $notification->channel = $channel; + $notification->pid = $result['pid']; + $notification->payload = $result['payload']; + $listeners[$channel]->emit($notification); + } + } if (!\pg_connection_busy($handle)) { Loop::disable($watcher); $deferred->resolve(\pg_get_result($handle)); - return; } - - // Reading not done, listen again. }); $this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, $handle) { @@ -71,6 +86,7 @@ class PgSqlExecutor implements Executor { $this->createResult = $this->callableFromInstanceMethod("createResult"); $this->executeCallback = $this->callableFromInstanceMethod("sendExecute"); + $this->unlisten = $this->callableFromInstanceMethod("unlisten"); } /** @@ -187,4 +203,37 @@ class PgSqlExecutor implements Executor { return new PgSqlStatement($sql, $this->executeCallback); }); } + + /** + * {@inheritdoc} + */ + public function listen(string $channel): Awaitable { + return pipe($this->query(\sprintf("LISTEN %s", $channel)), function (CommandResult $result) use ($channel) { + $postponed = new Postponed; + $this->listeners[$channel] = $postponed; + return new Listener($postponed->getObservable(), $channel, $this->unlisten); + }); + } + + /** + * @param string $channel + * + * @return \Interop\Async\Awaitable + * + * @throws \Error + */ + private function unlisten(string $channel): Awaitable { + if (!isset($this->listeners[$channel])) { + throw new \Error("Not listening on that channel"); + } + + $postponed = $this->listeners[$channel]; + unset($this->listeners[$channel]); + + $awaitable = $this->query(\sprintf("UNLISTEN %s", $channel)); + $awaitable->when(function () use ($postponed) { + $postponed->resolve(); + }); + return $awaitable; + } } diff --git a/lib/PqExecutor.php b/lib/PqExecutor.php index 23b3f89..4718510 100644 --- a/lib/PqExecutor.php +++ b/lib/PqExecutor.php @@ -2,7 +2,7 @@ namespace Amp\Postgres; -use Amp\{ CallableMaker, Coroutine, Deferred, function pipe }; +use Amp\{ CallableMaker, Coroutine, Deferred, Postponed, function pipe }; use Interop\Async\{ Awaitable, Loop }; use pq; @@ -23,6 +23,9 @@ class PqExecutor implements Executor { /** @var string */ private $await; + + /** @var \Amp\Postponed[] */ + private $listeners; /** @var callable */ private $send; @@ -30,6 +33,9 @@ class PqExecutor implements Executor { /** @var callable */ private $fetch; + /** @var callable */ + private $unlisten; + /** @var callable */ private $release; @@ -51,10 +57,7 @@ class PqExecutor implements Executor { if (!$handle->busy) { $deferred->resolve($handle->getResult()); - return; } - - // Reading not done, listen again. }); $this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) { @@ -70,6 +73,7 @@ class PqExecutor implements Executor { $this->send = $this->callableFromInstanceMethod("send"); $this->fetch = $this->callableFromInstanceMethod("fetch"); + $this->unlisten = $this->callableFromInstanceMethod("unlisten"); $this->release = $this->callableFromInstanceMethod("release"); } @@ -176,7 +180,7 @@ class PqExecutor implements Executor { } switch ($result->status) { - case pq\Result::TUPLES_OK: // No more rows in result set. + case pq\Result::TUPLES_OK: // End of result set. return null; case pq\Result::SINGLE_TUPLE: @@ -213,4 +217,48 @@ class PqExecutor implements Executor { public function prepare(string $sql): Awaitable { return new Coroutine($this->send([$this->handle, "prepareAsync"], $sql, $sql)); } + + /** + * {@inheritdoc} + */ + public function listen(string $channel): Awaitable { + $postponed = new Postponed; + $awaitable = new Coroutine($this->send( + [$this->handle, "listenAsync"], + $channel, + static function (string $channel, string $message, int $pid) use ($postponed) { + $notification = new Notification; + $notification->channel = $channel; + $notification->pid = $pid; + $notification->payload = $message; + $postponed->emit($notification); + })); + + return pipe($awaitable, function () use ($postponed, $channel) { + $this->listeners[$channel] = $postponed; + return new Listener($postponed->getObservable(), $channel, $this->unlisten); + }); + } + + /** + * @param string $channel + * + * @return \Interop\Async\Awaitable + * + * @throws \Error + */ + private function unlisten(string $channel): Awaitable { + if (!isset($this->listeners[$channel])) { + throw new \Error("Not listening on that channel"); + } + + $postponed = $this->listeners[$channel]; + unset($this->listeners[$channel]); + + $awaitable = new Coroutine($this->send([$this->handle, "unlistenAsync"], $channel)); + $awaitable->when(function () use ($postponed) { + $postponed->resolve(); + }); + return $awaitable; + } }