Delimit messages using varints, as per sass/embedded-protocol#38 (#23)

See sass/embedded-protocol#37
This commit is contained in:
Natalie Weizenbaum 2020-12-22 16:06:16 -08:00 committed by GitHub
parent 0aa346cf7d
commit 3130888661
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 83 deletions

View File

@ -8,6 +8,7 @@ import 'dart:typed_data';
import 'package:async/async.dart'; import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart'; import 'package:stream_channel/stream_channel.dart';
import 'package:typed_data/typed_data.dart';
/// A [StreamChannelTransformer] that converts a channel that sends and receives /// A [StreamChannelTransformer] that converts a channel that sends and receives
/// arbitrarily-chunked binary data to one that sends and receives packets of /// arbitrarily-chunked binary data to one that sends and receives packets of
@ -21,17 +22,16 @@ final StreamChannelTransformer<Uint8List, List<int>> lengthDelimited =
/// into a stream of packet contents. /// into a stream of packet contents.
final lengthDelimitedDecoder = final lengthDelimitedDecoder =
StreamTransformer<List<int>, Uint8List>.fromBind((stream) { StreamTransformer<List<int>, Uint8List>.fromBind((stream) {
// The buffer into which the four-byte little-endian length of the next packet // The number of bits we've consumed so far to fill out [nextMessageLength].
// will be written. int nextMessageLengthBits = 0;
var lengthBuffer = Uint8List(4);
// The index of the next byte to write to [lengthBuffer]. Once this is equal // The length of the next message, in bytes.
// to [lengthBuffer.length], the full length is available. //
var lengthBufferIndex = 0; // This is built up from a [varint]. Once it's fully consumed, [buffer] is
// initialized.
// The length of the next message, in bytes, read from [lengthBuffer] once //
// it's full. // [varint]: https://developers.google.com/protocol-buffers/docs/encoding#varints
int nextMessageLength; int nextMessageLength = 0;
// The buffer into which the packet data itself is written. Initialized once // The buffer into which the packet data itself is written. Initialized once
// [nextMessageLength] is known. // [nextMessageLength] is known.
@ -53,47 +53,51 @@ final lengthDelimitedDecoder =
// multiple messages. // multiple messages.
var i = 0; var i = 0;
// Adds bytes from [chunk] to [destination] at [destinationIndex] without
// overflowing the bounds of [destination], and increments [i] for each byte
// written.
//
// Returns the number of bytes written.
int writeFromChunk(Uint8List destination, int destinationIndex) {
var bytesToWrite =
math.min(destination.length - destinationIndex, chunk.length - i);
destination.setRange(
destinationIndex, destinationIndex + bytesToWrite, chunk, i);
i += bytesToWrite;
return bytesToWrite;
}
while (i < chunk.length) { while (i < chunk.length) {
// We can be in one of two states here: // We can be in one of two states here:
// //
// * Both [nextMessageLength] and [buffer] are `null`, in which case we're // * [buffer] is `null`, in which case we're adding data to
// waiting until we have four bytes in [lengthBuffer] to know how big of // [nextMessageLength] until we reach a byte with its most significant
// a buffer to allocate. // bit set to 0.
// //
// * Neither [nextMessageLength] nor [buffer] are `null`, in which case // * [buffer] is not `null`, in which case we're waiting for [buffer] to
// we're waiting for [buffer] to have [nextMessageLength] in it before // have [nextMessageLength] bytes in it before we send it to
// we send it to [queue.local.sink] and start waiting for the next // [queue.local.sink] and start waiting for the next message.
// message. if (buffer == null) {
if (nextMessageLength == null) { var byte = chunk[i];
lengthBufferIndex += writeFromChunk(lengthBuffer, lengthBufferIndex);
if (lengthBufferIndex < 4) return;
nextMessageLength = // Varints encode data in the 7 lower bits of each byte, which we access
ByteData.view(lengthBuffer.buffer).getUint32(0, Endian.little); // by masking with 0x7f = 0b01111111.
nextMessageLength += (byte & 0x7f) << nextMessageLengthBits;
nextMessageLengthBits += 7;
i++;
// If the byte is higher than 0x7f = 0b01111111, that means its high bit
// is set which and so there are more bytes to consume before we know
// the full message length.
if (byte > 0x7f) continue;
// Otherwise, [nextMessageLength] is now finalized and we can allocate
// the data buffer.
buffer = Uint8List(nextMessageLength); buffer = Uint8List(nextMessageLength);
bufferIndex = 0; bufferIndex = 0;
} }
bufferIndex += writeFromChunk(buffer, bufferIndex); // Copy as many bytes as we can from [chunk] to [buffer], making sure not
// to try to copy more than the buffer can hold (if the chunk has another
// message after the current one) or more than the chunk has available (if
// the current message is split across multiple chunks).
var bytesToWrite =
math.min(buffer.length - bufferIndex, chunk.length - i);
buffer.setRange(bufferIndex, bufferIndex + bytesToWrite, chunk, i);
i += bytesToWrite;
bufferIndex += bytesToWrite;
if (bufferIndex < nextMessageLength) return; if (bufferIndex < nextMessageLength) return;
sink.add(Uint8List.view(buffer.buffer, 0, nextMessageLength)); // Once we've filled the buffer, emit it and reset our state.
lengthBufferIndex = 0; sink.add(buffer);
nextMessageLength = null; nextMessageLength = 0;
nextMessageLengthBits = 0;
buffer = null; buffer = null;
bufferIndex = null; bufferIndex = null;
} }
@ -106,9 +110,22 @@ final lengthDelimitedDecoder =
final lengthDelimitedEncoder = final lengthDelimitedEncoder =
StreamTransformer<Uint8List, List<int>>.fromHandlers( StreamTransformer<Uint8List, List<int>>.fromHandlers(
handleData: (message, sink) { handleData: (message, sink) {
var messageLength = Uint8List(4); var length = message.length;
ByteData.view(messageLength.buffer) if (length == 0) {
.setUint32(0, message.length, Endian.little); sink.add([0]);
sink.add(messageLength); return;
}
// Write the length in varint format, 7 bits at a time from least to most
// significant.
var lengthBuffer = Uint8Buffer();
while (length > 0) {
// The highest-order bit indicates whether more bytes are necessary to fully
// express the number. The lower 7 bits indicate the number's value.
lengthBuffer.add((length > 0x7f ? 0x80 : 0) | (length & 0x7f));
length >>= 7;
}
sink.add(Uint8List.view(lengthBuffer.buffer, 0, lengthBuffer.length));
sink.add(message); sink.add(message);
}); });

View File

@ -1,5 +1,5 @@
name: sass_embedded name: sass_embedded
version: 1.0.0-beta.5 version: 1.0.0-dev
description: An implementation of the Sass embedded protocol using Dart Sass. description: An implementation of the Sass embedded protocol using Dart Sass.
author: Sass Team author: Sass Team
homepage: https://github.com/sass/dart-sass-embedded homepage: https://github.com/sass/dart-sass-embedded
@ -19,6 +19,7 @@ dependencies:
source_span: ^1.1.0 source_span: ^1.1.0
stack_trace: ^1.6.0 stack_trace: ^1.6.0
stream_channel: ">=1.6.0 <3.0.0" stream_channel: ">=1.6.0 <3.0.0"
typed_data: ^1.1.0
dev_dependencies: dev_dependencies:
cli_pkg: ^1.0.0-beta.8 cli_pkg: ^1.0.0-beta.8

View File

@ -25,20 +25,20 @@ void main() {
test("encodes an empty message", () { test("encodes an empty message", () {
sink.add([]); sink.add([]);
sink.close(); sink.close();
expect(collectBytes(stream), completion(equals([0, 0, 0, 0]))); expect(collectBytes(stream), completion(equals([0])));
}); });
test("encodes a message of length 1", () { test("encodes a message of length 1", () {
sink.add([123]); sink.add([123]);
sink.close(); sink.close();
expect(collectBytes(stream), completion(equals([1, 0, 0, 0, 123]))); expect(collectBytes(stream), completion(equals([1, 123])));
}); });
test("encodes a message of length greater than 256", () { test("encodes a message of length greater than 256", () {
sink.add(List.filled(300, 1)); sink.add(List.filled(300, 1));
sink.close(); sink.close();
expect(collectBytes(stream), expect(collectBytes(stream),
completion(equals([44, 1, 0, 0, ...List.filled(300, 1)]))); completion(equals([172, 2, ...List.filled(300, 1)])));
}); });
test("encodes multiple messages", () { test("encodes multiple messages", () {
@ -46,10 +46,8 @@ void main() {
sink.add([20, 30]); sink.add([20, 30]);
sink.add([40, 50, 60]); sink.add([40, 50, 60]);
sink.close(); sink.close();
expect( expect(collectBytes(stream),
collectBytes(stream), completion(equals([1, 10, 2, 20, 30, 3, 40, 50, 60])));
completion(equals(
[1, 0, 0, 0, 10, 2, 0, 0, 0, 20, 30, 3, 0, 0, 0, 40, 50, 60])));
}); });
}); });
@ -64,75 +62,62 @@ void main() {
group("decodes an empty message", () { group("decodes an empty message", () {
test("from a single chunk", () { test("from a single chunk", () {
sink.add([0, 0, 0, 0]); sink.add([0]);
expect(queue, emits(isEmpty));
});
test("from multiple chunks", () {
sink.add([0, 0]);
sink.add([0, 0]);
expect(queue, emits(isEmpty));
});
test("from one chunk per byte", () {
sink..add([0])..add([0])..add([0])..add([0]);
expect(queue, emits(isEmpty)); expect(queue, emits(isEmpty));
}); });
test("from a chunk that contains more data", () { test("from a chunk that contains more data", () {
sink.add([0, 0, 0, 0, 1, 0, 0, 0, 100]); sink.add([0, 1, 100]);
expect(queue, emits(isEmpty)); expect(queue, emits(isEmpty));
}); });
}); });
group("decodes a longer message", () { group("decodes a longer message", () {
test("from a single chunk", () { test("from a single chunk", () {
sink.add([4, 0, 0, 0, 1, 2, 3, 4]); sink.add([172, 2, ...List.filled(300, 1)]);
expect(queue, emits([1, 2, 3, 4])); expect(queue, emits(List.filled(300, 1)));
}); });
test("from multiple chunks", () { test("from multiple chunks", () {
sink..add([4, 0])..add([0, 0, 1, 2])..add([3, 4]); sink..add([172])..add([2, 1])..add(List.filled(299, 1));
expect(queue, emits([1, 2, 3, 4])); expect(queue, emits(List.filled(300, 1)));
}); });
test("from one chunk per byte", () { test("from one chunk per byte", () {
for (var byte in [4, 0, 0, 0, 1, 2, 3, 4]) { for (var byte in [172, 2, ...List.filled(300, 1)]) {
sink.add([byte]); sink.add([byte]);
} }
expect(queue, emits([1, 2, 3, 4])); expect(queue, emits(List.filled(300, 1)));
}); });
test("from a chunk that contains more data", () { test("from a chunk that contains more data", () {
sink.add([4, 0, 0, 0, 1, 2, 3, 4, 1, 0, 0, 0]); sink.add([172, 2, ...List.filled(300, 1), 1, 10]);
expect(queue, emits([1, 2, 3, 4]));
});
test("of length greater than 256", () {
sink.add([44, 1, 0, 0, ...List.filled(300, 1)]);
expect(queue, emits(List.filled(300, 1))); expect(queue, emits(List.filled(300, 1)));
}); });
}); });
group("decodes multiple messages", () { group("decodes multiple messages", () {
test("from single chunk", () { test("from single chunk", () {
sink.add([4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]); sink.add([4, 1, 2, 3, 4, 2, 101, 102]);
expect(queue, emits([1, 2, 3, 4])); expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits([101, 102])); expect(queue, emits([101, 102]));
}); });
test("from multiple chunks", () { test("from multiple chunks", () {
sink..add([4, 0])..add([0, 0, 1, 2, 3, 4, 2, 0])..add([0, 0, 101, 102]); sink
..add([4])
..add([1, 2, 3, 4, 172])
..add([2, ...List.filled(300, 1)]);
expect(queue, emits([1, 2, 3, 4])); expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits([101, 102])); expect(queue, emits(List.filled(300, 1)));
}); });
test("from one chunk per byte", () { test("from one chunk per byte", () {
for (var byte in [4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]) { for (var byte in [4, 1, 2, 3, 4, 172, 2, ...List.filled(300, 1)]) {
sink.add([byte]); sink.add([byte]);
} }
expect(queue, emits([1, 2, 3, 4])); expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits([101, 102])); expect(queue, emits(List.filled(300, 1)));
}); });
}); });
}); });

View File

@ -26,7 +26,7 @@ void main() {
}); });
test("caused by an invalid message", () async { test("caused by an invalid message", () async {
process.stdin.add([1, 0, 0, 0, 0]); process.stdin.add([1, 0]);
await expectParseError( await expectParseError(
process, "Protocol message contained an invalid tag (zero)."); process, "Protocol message contained an invalid tag (zero).");
expect(await process.exitCode, 76); expect(await process.exitCode, 76);