mirror of
https://github.com/danog/amp.git
synced 2025-01-22 21:31:18 +01:00
Update for Revolt changes and Future import
This commit is contained in:
parent
7e30ee0c2c
commit
5b95aa590c
@ -2,8 +2,8 @@
|
||||
|
||||
namespace Amp;
|
||||
|
||||
use Revolt\Future\Future;
|
||||
use function Revolt\Future\spawn;
|
||||
use Amp\Future;
|
||||
use function Amp\Future\spawn;
|
||||
|
||||
/**
|
||||
* @template TValue
|
||||
|
@ -2,12 +2,12 @@
|
||||
|
||||
namespace Amp\Internal;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\DisposedException;
|
||||
use Amp\Future;
|
||||
use Amp\Pipeline;
|
||||
use Revolt\EventLoop\Loop;
|
||||
use Revolt\EventLoop\Suspension;
|
||||
use Revolt\Future\Deferred;
|
||||
use Revolt\Future\Future;
|
||||
|
||||
/**
|
||||
* Class used internally by {@see Pipeline} implementations. Do not use this class in your code, instead compose your
|
||||
|
@ -19,7 +19,7 @@ final class FutureIterator
|
||||
private FutureIteratorQueue $queue;
|
||||
|
||||
/**
|
||||
* @var null|Future<void>|Future<null>|Future<array{Tk, Future<Tv>}>
|
||||
* @var null|Future<void>|Future<array{Tk, Future<Tv>}>
|
||||
*/
|
||||
private ?Future $complete = null;
|
||||
|
||||
|
@ -3,15 +3,15 @@
|
||||
|
||||
namespace Amp\Pipeline;
|
||||
|
||||
use Amp\Future;
|
||||
use Amp\AsyncGenerator;
|
||||
use Amp\Pipeline;
|
||||
use Amp\PipelineSource;
|
||||
use Revolt\Future\Future;
|
||||
use function Amp\Future\all;
|
||||
use function Amp\Future\spawn;
|
||||
use function Amp\Internal\createTypeError;
|
||||
use function Revolt\EventLoop\defer;
|
||||
use function Revolt\EventLoop\delay;
|
||||
use function Revolt\Future\all;
|
||||
use function Revolt\Future\spawn;
|
||||
|
||||
/**
|
||||
* Creates a pipeline from the given iterable, emitting the each value. The iterable may contain promises. If any
|
||||
@ -20,7 +20,7 @@ use function Revolt\Future\spawn;
|
||||
* @template TValue
|
||||
*
|
||||
* @param iterable $iterable Elements to emit.
|
||||
* @param int $delay Delay between elements emitted in milliseconds.
|
||||
* @param float $delay Delay between elements emitted in seconds.
|
||||
*
|
||||
* @psalm-param iterable<TValue> $iterable
|
||||
*
|
||||
@ -30,7 +30,7 @@ use function Revolt\Future\spawn;
|
||||
*
|
||||
* @throws \TypeError If the argument is not an array or instance of \Traversable.
|
||||
*/
|
||||
function fromIterable(iterable $iterable, int $delay = 0): Pipeline
|
||||
function fromIterable(iterable $iterable, float $delay = 0): Pipeline
|
||||
{
|
||||
return new AsyncGenerator(static function () use ($iterable, $delay): \Generator {
|
||||
foreach ($iterable as $value) {
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Amp;
|
||||
|
||||
use Revolt\Future\Future;
|
||||
use Amp\Future;
|
||||
|
||||
/**
|
||||
* PipelineSource is a container for a Pipeline that can emit values using the emit() method and completed using the
|
||||
@ -52,7 +52,7 @@ final class PipelineSource
|
||||
*
|
||||
* @psalm-param TValue $value
|
||||
*
|
||||
* @return Future<null> Resolves with null when the emitted value has been consumed or fails with
|
||||
* @return Future<void> Resolves with null when the emitted value has been consumed or fails with
|
||||
* {@see DisposedException} if the pipeline has been disposed.
|
||||
*/
|
||||
public function emit(mixed $value): Future
|
||||
|
@ -14,10 +14,10 @@ final class TimeoutCancellationToken implements CancellationToken
|
||||
private CancellationToken $token;
|
||||
|
||||
/**
|
||||
* @param int $timeout Milliseconds until cancellation is requested.
|
||||
* @param float $timeout Seconds until cancellation is requested.
|
||||
* @param string $message Message for TimeoutException. Default is "Operation timed out".
|
||||
*/
|
||||
public function __construct(int $timeout, string $message = "Operation timed out")
|
||||
public function __construct(float $timeout, string $message = "Operation timed out")
|
||||
{
|
||||
$source = new CancellationTokenSource;
|
||||
$this->token = $source->getToken();
|
||||
|
@ -3,16 +3,16 @@
|
||||
namespace Amp\Test;
|
||||
|
||||
use Amp\AsyncGenerator;
|
||||
use Amp\Deferred;
|
||||
use Amp\DisposedException;
|
||||
use Amp\PHPUnit\AsyncTestCase;
|
||||
use Amp\PHPUnit\TestException;
|
||||
use Revolt\Future\Deferred;
|
||||
use function Amp\Future\spawn;
|
||||
use function Revolt\EventLoop\delay;
|
||||
use function Revolt\Future\spawn;
|
||||
|
||||
class AsyncGeneratorTest extends AsyncTestCase
|
||||
{
|
||||
const TIMEOUT = 100;
|
||||
private const TIMEOUT = 0.1;
|
||||
|
||||
public function testNonGeneratorCallable(): void
|
||||
{
|
||||
@ -65,7 +65,7 @@ class AsyncGeneratorTest extends AsyncTestCase
|
||||
$value = 1;
|
||||
$send = 2;
|
||||
$generator = new AsyncGenerator(function () use (&$result, $value) {
|
||||
delay(100); // Wait so send() is called before $yield().
|
||||
delay(0.1); // Wait so send() is called before $yield().
|
||||
$result = yield $value;
|
||||
});
|
||||
|
||||
@ -102,7 +102,7 @@ class AsyncGeneratorTest extends AsyncTestCase
|
||||
$value = 1;
|
||||
$exception = new \Exception;
|
||||
$generator = new AsyncGenerator(function () use (&$result, $value) {
|
||||
delay(100); // Wait so throw() is called before $yield().
|
||||
delay(0.1); // Wait so throw() is called before $yield().
|
||||
try {
|
||||
$result = yield $value;
|
||||
} catch (\Throwable $exception) {
|
||||
|
@ -64,7 +64,7 @@ class CancellationTest extends AsyncTestCase
|
||||
|
||||
$cancellationSource->cancel();
|
||||
|
||||
delay(10); // Tick event loop to invoke callbacks.
|
||||
delay(0.01); // Tick event loop to invoke callbacks.
|
||||
|
||||
self::assertInstanceOf(TestException::class, $reason);
|
||||
}
|
||||
|
@ -14,6 +14,6 @@ class DiscardTest extends AsyncTestCase
|
||||
|
||||
public function testCount(): void
|
||||
{
|
||||
self::assertSame(3, Pipeline\discard(Pipeline\fromIterable(['a', 1, false], 1))->join());
|
||||
self::assertSame(3, Pipeline\discard(Pipeline\fromIterable(['a', 1, false], 0.001))->join());
|
||||
}
|
||||
}
|
||||
|
@ -2,16 +2,16 @@
|
||||
|
||||
namespace Amp\Test\Pipeline;
|
||||
|
||||
use Amp\Future;
|
||||
use Amp\PHPUnit\AsyncTestCase;
|
||||
use Amp\PHPUnit\TestException;
|
||||
use Amp\Pipeline;
|
||||
use Revolt\Future\Future;
|
||||
use function Amp\Future\spawn;
|
||||
use function Revolt\EventLoop\delay;
|
||||
use function Revolt\Future\spawn;
|
||||
|
||||
class FromIterableTest extends AsyncTestCase
|
||||
{
|
||||
const TIMEOUT = 10;
|
||||
private const TIMEOUT = 0.1;
|
||||
|
||||
public function testSuccessfulPromises(): void
|
||||
{
|
||||
@ -55,9 +55,9 @@ class FromIterableTest extends AsyncTestCase
|
||||
{
|
||||
$expected = \range(1, 4);
|
||||
$pipeline = Pipeline\fromIterable([
|
||||
$this->asyncValue(30, 1),
|
||||
$this->asyncValue(10, 2),
|
||||
$this->asyncValue(20, 3),
|
||||
$this->asyncValue(0.03, 1),
|
||||
$this->asyncValue(0.01, 2),
|
||||
$this->asyncValue(0.02, 3),
|
||||
Future::complete(4),
|
||||
]);
|
||||
|
||||
@ -134,7 +134,7 @@ class FromIterableTest extends AsyncTestCase
|
||||
self::assertSame($count, $i);
|
||||
}
|
||||
|
||||
private function asyncValue(int $delay, mixed $value): Future
|
||||
private function asyncValue(float $delay, mixed $value): Future
|
||||
{
|
||||
return spawn(static function () use ($delay, $value): mixed {
|
||||
delay($delay);
|
||||
|
@ -3,12 +3,12 @@
|
||||
namespace Amp\Test\Pipeline;
|
||||
|
||||
use Amp\AsyncGenerator;
|
||||
use Amp\Future;
|
||||
use Amp\PHPUnit\AsyncTestCase;
|
||||
use Amp\PHPUnit\TestException;
|
||||
use Amp\Pipeline;
|
||||
use Revolt\Future\Future;
|
||||
use function Amp\Future\spawn;
|
||||
use function Revolt\EventLoop\delay;
|
||||
use function Revolt\Future\spawn;
|
||||
|
||||
class MergeTest extends AsyncTestCase
|
||||
{
|
||||
@ -30,7 +30,7 @@ class MergeTest extends AsyncTestCase
|
||||
public function testMerge(array $array, array $expected): void
|
||||
{
|
||||
$pipelines = \array_map(static function (array $iterator): Pipeline {
|
||||
return Pipeline\fromIterable($iterator, 10);
|
||||
return Pipeline\fromIterable($iterator, 0.01);
|
||||
}, $array);
|
||||
|
||||
$pipeline = Pipeline\merge($pipelines);
|
||||
@ -46,8 +46,8 @@ class MergeTest extends AsyncTestCase
|
||||
public function testMergeWithDelayedYields(): void
|
||||
{
|
||||
$pipelines = [];
|
||||
$values1 = [$this->asyncValue(10, 1), $this->asyncValue(50, 2), $this->asyncValue(70, 3)];
|
||||
$values2 = [$this->asyncValue(20, 4), $this->asyncValue(40, 5), $this->asyncValue(60, 6)];
|
||||
$values1 = [$this->asyncValue(0.01, 1), $this->asyncValue(0.05, 2), $this->asyncValue(0.07, 3)];
|
||||
$values2 = [$this->asyncValue(0.02, 4), $this->asyncValue(0.04, 5), $this->asyncValue(0.06, 6)];
|
||||
$expected = [1, 4, 5, 2, 6, 3];
|
||||
|
||||
$pipelines[] = new AsyncGenerator(function () use ($values1) {
|
||||
@ -100,7 +100,7 @@ class MergeTest extends AsyncTestCase
|
||||
Pipeline\merge([1]);
|
||||
}
|
||||
|
||||
private function asyncValue(int $delay, mixed $value): Future
|
||||
private function asyncValue(float $delay, mixed $value): Future
|
||||
{
|
||||
return spawn(static function () use ($delay, $value): mixed {
|
||||
delay($delay);
|
||||
|
@ -9,7 +9,7 @@ class ToArrayTest extends AsyncTestCase
|
||||
{
|
||||
public function testNonEmpty(): void
|
||||
{
|
||||
$pipeline = Pipeline\fromIterable(["abc", "foo", "bar"], 5);
|
||||
$pipeline = Pipeline\fromIterable(["abc", "foo", "bar"], 0.005);
|
||||
self::assertSame(["abc", "foo", "bar"], Pipeline\toArray($pipeline));
|
||||
}
|
||||
|
||||
|
@ -2,14 +2,13 @@
|
||||
|
||||
namespace Amp\Test;
|
||||
|
||||
use Amp\Future;
|
||||
use Amp\DisposedException;
|
||||
use Amp\PHPUnit\AsyncTestCase;
|
||||
use Amp\PipelineSource;
|
||||
use Revolt\Future\Deferred;
|
||||
use Revolt\Future\Future;
|
||||
use function Amp\Future\spawn;
|
||||
use function Revolt\EventLoop\defer;
|
||||
use function Revolt\EventLoop\delay;
|
||||
use function Revolt\Future\spawn;
|
||||
|
||||
class PipelineSourceTest extends AsyncTestCase
|
||||
{
|
||||
@ -197,7 +196,7 @@ class PipelineSourceTest extends AsyncTestCase
|
||||
|
||||
unset($pipeline); // Should relieve all back-pressure.
|
||||
|
||||
delay(5); // Tick event loop to invoke future callbacks.
|
||||
delay(0.005); // Tick event loop to invoke future callbacks.
|
||||
|
||||
self::assertSame(5, $invoked);
|
||||
|
||||
@ -283,7 +282,7 @@ class PipelineSourceTest extends AsyncTestCase
|
||||
$this->source->onDisposal($this->createCallback(1));
|
||||
self::assertTrue($this->source->isDisposed());
|
||||
|
||||
delay(10);
|
||||
delay(0.01);
|
||||
|
||||
$this->expectException(DisposedException::class);
|
||||
|
||||
|
@ -14,10 +14,10 @@ class TimeoutCancellationTokenTest extends AsyncTestCase
|
||||
public function testTimeout(): void
|
||||
{
|
||||
$line = __LINE__ + 1;
|
||||
$token = new TimeoutCancellationToken(10);
|
||||
$token = new TimeoutCancellationToken(0.01);
|
||||
|
||||
self::assertFalse($token->isRequested());
|
||||
delay(20);
|
||||
delay(0.02);
|
||||
self::assertTrue($token->isRequested());
|
||||
|
||||
try {
|
||||
@ -33,7 +33,7 @@ class TimeoutCancellationTokenTest extends AsyncTestCase
|
||||
|
||||
public function testWatcherCancellation(): void
|
||||
{
|
||||
$token = new TimeoutCancellationToken(1);
|
||||
$token = new TimeoutCancellationToken(0.001);
|
||||
self::assertSame(1, Loop::getInfo()["delay"]["enabled"]);
|
||||
unset($token);
|
||||
self::assertSame(0, Loop::getInfo()["delay"]["enabled"]);
|
||||
|
Loading…
x
Reference in New Issue
Block a user