1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-26 12:04:50 +01:00

Update for Stream to Pipeline changes

This commit is contained in:
Aaron Piotrowski 2020-08-28 12:26:11 -05:00
parent c092fdf183
commit dee41676fb
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
9 changed files with 18 additions and 94 deletions

View File

@ -2,12 +2,12 @@
namespace Amp\Postgres;
use Amp\Pipeline;
use Amp\Promise;
use Amp\Stream;
final class ConnectionListener implements Listener
{
/** @var Stream */
/** @var Pipeline */
private $stream;
/** @var string */
@ -17,13 +17,13 @@ final class ConnectionListener implements Listener
private $unlisten;
/**
* @param Stream $stream Stream emitting notificatons on the channel.
* @param Pipeline $pipeline Pipeline emitting notificatons on the channel.
* @param string $channel Channel name.
* @param callable(string $channel): $unlisten Function invoked to unlisten from the channel.
*/
public function __construct(Stream $stream, string $channel, callable $unlisten)
public function __construct(Pipeline $pipeline, string $channel, callable $unlisten)
{
$this->stream = $stream;
$this->stream = $pipeline;
$this->channel = $channel;
$this->unlisten = $unlisten;
}
@ -52,22 +52,6 @@ final class ConnectionListener implements Listener
$this->unlisten();
}
/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal): void
{
$this->stream->onDisposal($onDisposal);
}
/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion): void
{
$this->stream->onCompletion($onCompletion);
}
/**
* @return string Channel name.
*/

View File

@ -2,10 +2,10 @@
namespace Amp\Postgres;
use Amp\Pipeline;
use Amp\Promise;
use Amp\Stream;
interface Listener extends Stream
interface Listener extends Pipeline
{
/**
* @return Promise<Notification|null>

View File

@ -4,13 +4,13 @@ namespace Amp\Postgres;
use Amp\Deferred;
use Amp\Loop;
use Amp\PipelineSource;
use Amp\Promise;
use Amp\Sql\Common\CommandResult;
use Amp\Sql\ConnectionException;
use Amp\Sql\FailureException;
use Amp\Sql\QueryError;
use Amp\Sql\Result;
use Amp\StreamSource;
use Amp\Struct;
use Amp\Success;
use function Amp\call;
@ -44,7 +44,7 @@ final class PgSqlHandle implements Handle
/** @var string */
private $await;
/** @var StreamSource[] */
/** @var PipelineSource[] */
private $listeners = [];
/** @var object[] Anonymous class using Struct trait. */
@ -463,7 +463,7 @@ final class PgSqlHandle implements Handle
throw new QueryError(\sprintf("Already listening on channel '%s'", $channel));
}
$this->listeners[$channel] = $source = new StreamSource;
$this->listeners[$channel] = $source = new PipelineSource;
try {
yield $this->query(\sprintf("LISTEN %s", $this->quoteName($channel)));
@ -473,7 +473,7 @@ final class PgSqlHandle implements Handle
}
Loop::enable($this->poll);
return new ConnectionListener($source->stream(), $channel, \Closure::fromCallable([$this, 'unlisten']));
return new ConnectionListener($source->pipe(), $channel, \Closure::fromCallable([$this, 'unlisten']));
});
}

View File

@ -55,7 +55,6 @@ final class PgSqlResultSet implements Result
\pg_free_result($handle);
}
});
$this->generator->getReturn(); // Force generator to start execution.
}
/**
@ -74,22 +73,6 @@ final class PgSqlResultSet implements Result
$this->generator->dispose();
}
/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal): void
{
$this->generator->onDisposal($onDisposal);
}
/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion): void
{
$this->generator->onCompletion($onCompletion);
}
/**
* @inheritDoc
*/

View File

@ -40,16 +40,6 @@ final class PooledListener implements Listener
$this->listener->dispose();
}
public function onDisposal(callable $onDisposal): void
{
$this->listener->onDisposal($onDisposal);
}
public function onCompletion(callable $onCompletion): void
{
$this->listener->onCompletion($onCompletion);
}
public function getChannel(): string
{
return $this->listener->getChannel();

View File

@ -53,22 +53,6 @@ final class PqBufferedResultSet implements Result
$this->generator->dispose();
}
/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal): void
{
$this->generator->onDisposal($onDisposal);
}
/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion): void
{
$this->generator->onCompletion($onCompletion);
}
/**
* @inheritDoc
*/

View File

@ -5,13 +5,13 @@ namespace Amp\Postgres;
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Loop;
use Amp\PipelineSource;
use Amp\Promise;
use Amp\Sql\Common\CommandResult;
use Amp\Sql\ConnectionException;
use Amp\Sql\FailureException;
use Amp\Sql\QueryError;
use Amp\Sql\Result;
use Amp\StreamSource;
use Amp\Struct;
use Amp\Success;
use pq;
@ -34,7 +34,7 @@ final class PqHandle implements Handle
/** @var string */
private $await;
/** @var StreamSource[] */
/** @var PipelineSource[] */
private $listeners;
/** @var object[] Anonymous class using Struct trait. */
@ -469,7 +469,7 @@ final class PqHandle implements Handle
throw new QueryError(\sprintf("Already listening on channel '%s'", $channel));
}
$this->listeners[$channel] = $source = new StreamSource;
$this->listeners[$channel] = $source = new PipelineSource;
try {
yield from $this->send(
@ -490,7 +490,7 @@ final class PqHandle implements Handle
}
Loop::enable($this->poll);
return new ConnectionListener($source->stream(), $channel, \Closure::fromCallable([$this, 'unlisten']));
return new ConnectionListener($source->pipe(), $channel, \Closure::fromCallable([$this, 'unlisten']));
});
}

View File

@ -39,7 +39,6 @@ final class PqUnbufferedResultSet implements Result
}
}
});
$this->generator->getReturn(); // Force generator to start execution.
}
public function getNextResult(): Promise
@ -63,22 +62,6 @@ final class PqUnbufferedResultSet implements Result
$this->generator->dispose();
}
/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal): void
{
$this->generator->onDisposal($onDisposal);
}
/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion): void
{
$this->generator->onCompletion($onCompletion);
}
/**
* @inheritDoc
*/

View File

@ -6,6 +6,7 @@ use Amp\Coroutine;
use Amp\Delayed;
use Amp\Loop;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Pipeline;
use Amp\Postgres\Link;
use Amp\Postgres\Listener;
use Amp\Postgres\QueryExecutionError;
@ -16,7 +17,6 @@ use Amp\Sql\Result;
use Amp\Sql\Statement;
use Amp\Sql\Transaction as SqlTransaction;
use Amp\Sql\TransactionError;
use Amp\Stream;
abstract class AbstractLinkTest extends AsyncTestCase
{
@ -433,8 +433,8 @@ abstract class AbstractLinkTest extends AsyncTestCase
$results = [];
$results[] = yield Stream\toArray(yield $statement1->execute([$data[0]]));
$results[] = yield Stream\toArray(yield $statement2->execute(['domain' => $data[0]]));
$results[] = yield Pipeline\toArray(yield $statement1->execute([$data[0]]));
$results[] = yield Pipeline\toArray(yield $statement2->execute(['domain' => $data[0]]));
foreach ($results as $result) {
/** @var Result $result */