diff --git a/lib/functions.php b/lib/functions.php index c9944a0..434aef3 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -2,7 +2,9 @@ namespace Amp\ByteStream; +use Amp\Iterator; use Amp\Loop; +use Amp\Producer; use Amp\Promise; use function Amp\call; @@ -44,7 +46,7 @@ function pipe(InputStream $source, OutputStream $destination): Promise } /** - * @param \Amp\ByteStream\InputStream $source + * @param \Amp\ByteStream\InputStream $source * * @return \Amp\Promise */ @@ -99,6 +101,7 @@ function getOutputBufferStream(): ResourceOutputStream return $stream; } + /** * The STDIN stream for the process associated with the currently active event loop. * @@ -155,3 +158,31 @@ function getStderr(): ResourceOutputStream return $stream; } + +function parseLineDelimitedJson(InputStream $stream, bool $assoc = false, int $depth = 512, int $options = 0): Iterator +{ + return new Producer(static function (callable $emit) use ($stream, $assoc, $depth, $options) { + $reader = new LineReader($stream); + + while (null !== $line = yield $reader->readLine()) { + $line = \trim($line); + + if ($line === '') { + continue; + } + + /** @noinspection PhpComposerExtensionStubsInspection */ + $data = \json_decode($line, $assoc, $depth, $options); + /** @noinspection PhpComposerExtensionStubsInspection */ + $error = \json_last_error(); + + /** @noinspection PhpComposerExtensionStubsInspection */ + if ($error !== \JSON_ERROR_NONE) { + /** @noinspection PhpComposerExtensionStubsInspection */ + throw new StreamException('Failed to parse JSON: ' . \json_last_error_msg(), $error); + } + + yield $emit($data); + } + }); +} diff --git a/test/ParseLineDelimitedJsonTest.php b/test/ParseLineDelimitedJsonTest.php new file mode 100644 index 0000000..662b574 --- /dev/null +++ b/test/ParseLineDelimitedJsonTest.php @@ -0,0 +1,38 @@ + "\nbar\r\n"]), + \json_encode(['foo' => []]), + ]))))); + + self::assertEquals([ + (object) ['foo' => "\nbar\r\n"], + (object) ['foo' => []], + ], $result); + } + + public function testInvalidJson() + { + $this->expectException(StreamException::class); + $this->expectExceptionMessage('Failed to parse JSON'); + + wait(Iterator\toArray(parseLineDelimitedJson(new InMemoryStream('{')))); + } +}