1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-30 04:29:12 +01:00

Better variable names; fix incorrect method name

This commit is contained in:
Aaron Piotrowski 2017-01-18 12:29:48 -06:00
parent fbd3b8b369
commit e3e3def9be
3 changed files with 17 additions and 17 deletions

View File

@ -119,7 +119,7 @@ abstract class AbstractConnection implements Connection {
throw new \Error("Invalid transaction type");
}
return pipe($promise, function (CommandResult $result) use ($isolation): Transaction {
return pipe($promise, function () use ($isolation): Transaction {
$this->busy = new Deferred;
$transaction = new Transaction($this->executor, $isolation);
$transaction->onComplete($this->release);

View File

@ -225,11 +225,11 @@ class PgSqlExecutor implements Executor {
* {@inheritdoc}
*/
public function listen(string $channel): Promise {
return pipe($this->query(\sprintf("LISTEN %s", $channel)), function (CommandResult $result) use ($channel) {
$postponed = new Emitter;
$this->listeners[$channel] = $postponed;
return pipe($this->query(\sprintf("LISTEN %s", $channel)), function () use ($channel): Listener {
$emitter = new Emitter;
$this->listeners[$channel] = $emitter;
Loop::enable($this->poll);
return new Listener($postponed->getStream(), $channel, $this->unlisten);
return new Listener($emitter->stream(), $channel, $this->unlisten);
});
}
@ -245,7 +245,7 @@ class PgSqlExecutor implements Executor {
throw new \Error("Not listening on that channel");
}
$postponed = $this->listeners[$channel];
$emitter = $this->listeners[$channel];
unset($this->listeners[$channel]);
if (empty($this->listeners) && $this->deferred === null) {
@ -253,8 +253,8 @@ class PgSqlExecutor implements Executor {
}
$promise = $this->query(\sprintf("UNLISTEN %s", $channel));
$promise->when(function () use ($postponed) {
$postponed->resolve();
$promise->when(function () use ($emitter) {
$emitter->resolve();
});
return $promise;
}

View File

@ -236,22 +236,22 @@ class PqExecutor implements Executor {
* {@inheritdoc}
*/
public function listen(string $channel): Promise {
$postponed = new Emitter;
$emitter = new Emitter;
$promise = new Coroutine($this->send(
[$this->handle, "listenAsync"],
$channel,
static function (string $channel, string $message, int $pid) use ($postponed) {
static function (string $channel, string $message, int $pid) use ($emitter) {
$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $pid;
$notification->payload = $message;
$postponed->emit($notification);
$emitter->emit($notification);
}));
return pipe($promise, function () use ($postponed, $channel) {
$this->listeners[$channel] = $postponed;
return pipe($promise, function () use ($emitter, $channel): Listener {
$this->listeners[$channel] = $emitter;
Loop::enable($this->poll);
return new Listener($postponed->getStream(), $channel, $this->unlisten);
return new Listener($emitter->stream(), $channel, $this->unlisten);
});
}
@ -267,7 +267,7 @@ class PqExecutor implements Executor {
throw new \Error("Not listening on that channel");
}
$postponed = $this->listeners[$channel];
$emitter = $this->listeners[$channel];
unset($this->listeners[$channel]);
if (empty($this->listeners) && $this->deferred === null) {
@ -275,8 +275,8 @@ class PqExecutor implements Executor {
}
$promise = new Coroutine($this->send([$this->handle, "unlistenAsync"], $channel));
$promise->when(function () use ($postponed) {
$postponed->resolve();
$promise->when(function () use ($emitter) {
$emitter->resolve();
});
return $promise;
}