mirror of
https://github.com/danog/amp.git
synced 2024-12-11 17:09:40 +01:00
Add callback to transform()
Suggestion for an apply()-like method.
This commit is contained in:
parent
1b4863b7b3
commit
3755155e51
@ -56,17 +56,15 @@ final class AsyncGenerator implements Stream
|
||||
}
|
||||
|
||||
/**
|
||||
* @return TransformationStream<TValue>
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function transform(): TransformationStream
|
||||
public function transform(callable $operator = null): TransformationStream
|
||||
{
|
||||
return $this->generator->transform();
|
||||
return $this->generator->transform($operator);
|
||||
}
|
||||
|
||||
/**
|
||||
* Continues the async generator, resolving the back-pressure promise with null.
|
||||
*
|
||||
* @return Promise<array>
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function continue(): Promise
|
||||
{
|
||||
|
@ -27,11 +27,7 @@ class AutoDisposingGenerator extends AutoDisposingStream implements GeneratorStr
|
||||
}
|
||||
|
||||
/**
|
||||
* @param mixed $value
|
||||
*
|
||||
* @psalm-param TSend $value
|
||||
*
|
||||
* @return Promise<array>
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function send($value): Promise
|
||||
{
|
||||
@ -39,9 +35,7 @@ class AutoDisposingGenerator extends AutoDisposingStream implements GeneratorStr
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \Throwable $exception
|
||||
*
|
||||
* @return Promise<array>
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function throw(\Throwable $exception): Promise
|
||||
{
|
||||
|
@ -31,7 +31,7 @@ class AutoDisposingStream implements Stream
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Promise<array>
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function continue(): Promise
|
||||
{
|
||||
@ -39,7 +39,7 @@ class AutoDisposingStream implements Stream
|
||||
}
|
||||
|
||||
/**
|
||||
* @return void
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function dispose()
|
||||
{
|
||||
@ -47,10 +47,10 @@ class AutoDisposingStream implements Stream
|
||||
}
|
||||
|
||||
/**
|
||||
* @return TransformationStream
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function transform(): TransformationStream
|
||||
public function transform(callable $operator = null): TransformationStream
|
||||
{
|
||||
return $this->stream->transform();
|
||||
return $this->stream->transform($operator);
|
||||
}
|
||||
}
|
||||
|
@ -131,9 +131,9 @@ trait Yielder
|
||||
return $deferred->promise();
|
||||
}
|
||||
|
||||
public function transform(): TransformationStream
|
||||
public function transform(callable $operator = null): TransformationStream
|
||||
{
|
||||
return new TransformationStream($this);
|
||||
return new TransformationStream($this, $operator);
|
||||
}
|
||||
|
||||
private function createStream(): Stream
|
||||
|
@ -29,7 +29,9 @@ interface Stream
|
||||
/**
|
||||
* Returns a stream object with fluent transformation methods.
|
||||
*
|
||||
* @param callable(TransformationStream):Stream $operator
|
||||
*
|
||||
* @return TransformationStream
|
||||
*/
|
||||
public function transform(): TransformationStream;
|
||||
public function transform(callable $operator = null): TransformationStream;
|
||||
}
|
||||
|
@ -11,9 +11,13 @@ final class TransformationStream implements Stream
|
||||
/** @var Stream<TValue> */
|
||||
private $stream;
|
||||
|
||||
public function __construct(Stream $stream)
|
||||
public function __construct(Stream $stream, callable $operator = null)
|
||||
{
|
||||
$this->stream = $stream;
|
||||
$this->stream = $stream instanceof self ? $stream->stream : $stream;
|
||||
|
||||
if ($operator !== null) {
|
||||
$this->stream = $this->apply($operator);
|
||||
}
|
||||
}
|
||||
|
||||
public function continue(): Promise
|
||||
@ -26,11 +30,26 @@ final class TransformationStream implements Stream
|
||||
$this->stream->dispose();
|
||||
}
|
||||
|
||||
public function transform(): self
|
||||
public function transform(callable $operator = null): self
|
||||
{
|
||||
if ($operator === null) {
|
||||
return $this;
|
||||
}
|
||||
|
||||
return new self($this->apply($operator));
|
||||
}
|
||||
|
||||
private function apply(callable $operator): self
|
||||
{
|
||||
$stream = $operator($this);
|
||||
|
||||
if ($stream instanceof Stream) {
|
||||
throw new \TypeError('$operator must return an instance of ' . Stream::class);
|
||||
}
|
||||
|
||||
return $stream;
|
||||
}
|
||||
|
||||
/**
|
||||
* @template TMap
|
||||
*
|
||||
@ -87,7 +106,7 @@ final class TransformationStream implements Stream
|
||||
return new self(new AsyncGenerator(function (callable $yield) use ($count) {
|
||||
$skipped = 0;
|
||||
while (list($value) = yield $this->stream->continue()) {
|
||||
if (++$skipped < $count) {
|
||||
if (++$skipped <= $count) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user