From 38d13dbb5c49cd5a86137cb5dd5b33bfc43f0512 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Thu, 22 Aug 2019 22:37:48 +0200 Subject: [PATCH 1/2] Add LineReader (#64) --- lib/LineReader.php | 62 +++++++++++++++++++++++++++ test/LineReaderTest.php | 94 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 lib/LineReader.php create mode 100644 test/LineReaderTest.php diff --git a/lib/LineReader.php b/lib/LineReader.php new file mode 100644 index 0000000..52a285e --- /dev/null +++ b/lib/LineReader.php @@ -0,0 +1,62 @@ +source = $inputStream; + } + + /** + * @return Promise + */ + public function readLine(): Promise + { + return call(function () { + if (($pos = \strpos($this->buffer, "\n")) !== false) { + $line = \substr($this->buffer, 0, $pos); + $this->buffer = \substr($this->buffer, $pos + 1); + return \rtrim($line, "\r"); + } + + while (null !== $chunk = yield $this->source->read()) { + $this->buffer .= $chunk; + + if (($pos = \strpos($this->buffer, "\n")) !== false) { + $line = \substr($this->buffer, 0, $pos); + $this->buffer = \substr($this->buffer, $pos + 1); + return \rtrim($line, "\r"); + } + } + + if ($this->buffer === "") { + return null; + } + + $line = $this->buffer; + $this->buffer = ""; + return \rtrim($line, "\r"); + }); + } + + public function getBuffer(): string + { + return $this->buffer; + } + + public function clearBuffer() + { + $this->buffer = ""; + } +} diff --git a/test/LineReaderTest.php b/test/LineReaderTest.php new file mode 100644 index 0000000..b59b093 --- /dev/null +++ b/test/LineReaderTest.php @@ -0,0 +1,94 @@ +check(["abc"], ["abc"]); + } + + public function testMultiLineLf() + { + $this->check(["abc\nef"], ["abc", "ef"]); + } + + public function testMultiLineCrLf() + { + $this->check(["abc\r\nef"], ["abc", "ef"]); + } + + public function testMultiLineEmptyNewlineStart() + { + $this->check(["\r\nabc\r\nef\r\n"], ["", "abc", "ef"]); + } + + public function testMultiLineEmptyNewlineEnd() + { + $this->check(["abc\r\nef\r\n"], ["abc", "ef"]); + } + + public function testMultiLineEmptyNewlineMiddle() + { + $this->check(["abc\r\n\r\nef\r\n"], ["abc", "", "ef"]); + } + + public function testEmpty() + { + $this->check([], []); + } + + public function testEmptyCrLf() + { + $this->check(["\r\n"], [""]); + } + + public function testEmptyCr() + { + $this->check(["\r"], [""]); + } + + public function testMultiLineSlow() + { + $this->check(["a", "bc", "\r", "\n\r\nef\r", "\n"], ["abc", "", "ef"]); + } + + public function testClearBuffer() + { + wait(call(static function () { + $inputStream = new IteratorStream(Iterator\fromIterable(["a\nb\nc"])); + + $reader = new LineReader($inputStream); + self::assertSame("a", yield $reader->readLine()); + self::assertSame("b\nc", $reader->getBuffer()); + + $reader->clearBuffer(); + + self::assertSame("", $reader->getBuffer()); + self::assertNull(yield $reader->readLine()); + })); + } + + private function check(array $chunks, array $expectedLines) + { + wait(call(static function () use ($chunks, $expectedLines) { + $inputStream = new IteratorStream(Iterator\fromIterable($chunks)); + + $reader = new LineReader($inputStream); + $lines = []; + + while (null !== $line = yield $reader->readLine()) { + $lines[] = $line; + } + + self::assertSame($expectedLines, $lines); + self::assertSame("", $reader->getBuffer()); + })); + } +} From 1b84c81bb27d14a1040ceae34937e1ead1891664 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Thu, 22 Aug 2019 23:36:28 +0200 Subject: [PATCH 2/2] Add line delimited JSON parser (#65) --- lib/functions.php | 33 ++++++++++++++++++++++++- test/ParseLineDelimitedJsonTest.php | 38 +++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 test/ParseLineDelimitedJsonTest.php diff --git a/lib/functions.php b/lib/functions.php index c9944a0..434aef3 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -2,7 +2,9 @@ namespace Amp\ByteStream; +use Amp\Iterator; use Amp\Loop; +use Amp\Producer; use Amp\Promise; use function Amp\call; @@ -44,7 +46,7 @@ function pipe(InputStream $source, OutputStream $destination): Promise } /** - * @param \Amp\ByteStream\InputStream $source + * @param \Amp\ByteStream\InputStream $source * * @return \Amp\Promise */ @@ -99,6 +101,7 @@ function getOutputBufferStream(): ResourceOutputStream return $stream; } + /** * The STDIN stream for the process associated with the currently active event loop. * @@ -155,3 +158,31 @@ function getStderr(): ResourceOutputStream return $stream; } + +function parseLineDelimitedJson(InputStream $stream, bool $assoc = false, int $depth = 512, int $options = 0): Iterator +{ + return new Producer(static function (callable $emit) use ($stream, $assoc, $depth, $options) { + $reader = new LineReader($stream); + + while (null !== $line = yield $reader->readLine()) { + $line = \trim($line); + + if ($line === '') { + continue; + } + + /** @noinspection PhpComposerExtensionStubsInspection */ + $data = \json_decode($line, $assoc, $depth, $options); + /** @noinspection PhpComposerExtensionStubsInspection */ + $error = \json_last_error(); + + /** @noinspection PhpComposerExtensionStubsInspection */ + if ($error !== \JSON_ERROR_NONE) { + /** @noinspection PhpComposerExtensionStubsInspection */ + throw new StreamException('Failed to parse JSON: ' . \json_last_error_msg(), $error); + } + + yield $emit($data); + } + }); +} diff --git a/test/ParseLineDelimitedJsonTest.php b/test/ParseLineDelimitedJsonTest.php new file mode 100644 index 0000000..662b574 --- /dev/null +++ b/test/ParseLineDelimitedJsonTest.php @@ -0,0 +1,38 @@ + "\nbar\r\n"]), + \json_encode(['foo' => []]), + ]))))); + + self::assertEquals([ + (object) ['foo' => "\nbar\r\n"], + (object) ['foo' => []], + ], $result); + } + + public function testInvalidJson() + { + $this->expectException(StreamException::class); + $this->expectExceptionMessage('Failed to parse JSON'); + + wait(Iterator\toArray(parseLineDelimitedJson(new InMemoryStream('{')))); + } +}