Add a very basic protocol dispatcher and a few tests

Currently this doesn't actually invoke the compiler, it just always
returns the expanded output for "a {b: c}".
This commit is contained in:
Natalie Weizenbaum 2019-10-25 13:00:47 -07:00
parent ea4f9f7204
commit 7489d5796d
6 changed files with 447 additions and 0 deletions

View File

@ -0,0 +1,31 @@
// Copyright 2019 Google Inc. Use of this source code is governed by an
// MIT-style license that can be found in the LICENSE file or at
import 'dart:io';
import 'package:stream_channel/stream_channel.dart';
import 'package:sass_embedded/src/dispatcher.dart';
import 'package:sass_embedded/src/embedded_sass.pb.dart';
import 'package:sass_embedded/src/util/length_delimited_transformer.dart';
void main(List<String> args) {
if (args.isNotEmpty) {
"This executable is not intended to be executed with arguments.\n"
"See for details.");
// USAGE error from
exitCode = 64;
var dispatcher = Dispatcher(
StreamChannel.withGuarantees(stdin, stdout, allowSinkErrors: false)
dispatcher.listen((request) {
return OutboundMessage_CompileResponse()
..success = (OutboundMessage_CompileResponse_CompileSuccess()
..css = "a {\n b: c;\n}\n");

lib/src/dispatcher.dart Normal file
View File

@ -0,0 +1,120 @@
// Copyright 2019 Google Inc. Use of this source code is governed by an
// MIT-style license that can be found in the LICENSE file or at
import 'dart:async';
import 'dart:io';
import 'dart:typed_data';
import 'package:protobuf/protobuf.dart';
import 'package:stack_trace/stack_trace.dart';
import 'package:stream_channel/stream_channel.dart';
import 'embedded_sass.pb.dart';
/// A class that dispatches messages to and from the host.
class Dispatcher {
/// The channel of encoded protocol buffers, connected to the host.
final StreamChannel<Uint8List> _channel;
/// Creates a [Dispatcher] that sends and receives encoded protocol buffers
/// over [channel].
/// Listens for incoming `CompileRequests` and passes them to [callback].
/// The callback must return a `CompileResponse` which is sent to the host. It
/// doesn't need to set []; the [Dispatcher]
/// will take care of that.
/// This may only be called once.
void listen(
FutureOr<OutboundMessage_CompileResponse> callback(
InboundMessage_CompileRequest request)) { async {
InboundMessage message;
try {
try {
message = InboundMessage.fromBuffer(binaryMessage);
} on InvalidProtocolBufferException catch (error) {
throw _parseError(error.message);
switch (message.whichMessage()) {
case InboundMessage_Message.error:
var error = message.ensureError();
.write("Host reported ${} error");
if ( != -1) stderr.write(" with request ${}");
stderr.writeln(": ${error.message}");
// SOFTWARE error from
exitCode = 70;
case InboundMessage_Message.compileRequest:
var request = message.ensureCompileRequest();
var response = await callback(request); =;
_send(OutboundMessage()..compileResponse = response);
case InboundMessage_Message.notSet:
// PROTOCOL error from
exitCode = 76;
throw _parseError("InboundMessage.message is not set.");
// PROTOCOL error from
exitCode = 76;
throw _parseError(
"Unknown message type: ${message.toDebugString()}");
} on ProtocolError catch (error) { = _messageId(message) ?? -1;
stderr.write("Host caused ${} error");
if ( != -1) stderr.write(" with request ${}");
stderr.writeln(": ${error.message}");
_send(OutboundMessage()..error = error);
} catch (error, stackTrace) {
var errorMessage = "$error\n${Chain.forTrace(stackTrace)}";
stderr.write("Internal compiler error: $errorMessage");
..error = (ProtocolError()
..type = ProtocolError_ErrorType.INTERNAL = _messageId(message) ?? -1
..message = errorMessage));
/// Sends [message] to the host.
void _send(OutboundMessage message) =>
/// Returns a [ProtocolError] with type `PARSE` and the given [message].
ProtocolError _parseError(String message) => ProtocolError()
..type = ProtocolError_ErrorType.PARSE
..message = message;
/// Returns the id for [message] if it it's a request or response, or `null`
/// otherwise.
int _messageId(InboundMessage message) {
if (message == null) return null;
switch (message.whichMessage()) {
case InboundMessage_Message.compileRequest:
return message.ensureCompileRequest().id;
case InboundMessage_Message.canonicalizeResponse:
return message.ensureCanonicalizeResponse().id;
case InboundMessage_Message.importResponse:
return message.ensureImportResponse().id;
case InboundMessage_Message.functionCallRequest:
return message.ensureFunctionCallRequest().id;
case InboundMessage_Message.functionCallResponse:
return message.ensureFunctionCallResponse().id;
return null;

View File

@ -7,12 +7,17 @@ homepage:
sdk: '>=2.4.0 <3.0.0'
dart-sass-embedded: dart_sass_embedded
async: ">=1.13.0 <3.0.0"
protobuf: ^1.0.0
stack_trace: ^1.6.0
stream_channel: ">=1.6.0 <3.0.0"
grinder: ^0.8.0
protoc_plugin: ^19.0.0
path: ^1.6.0
test: ^1.0.0

test/embedded_process.dart Normal file
View File

@ -0,0 +1,183 @@
// Copyright 2019 Google Inc. Use of this source code is governed by an
// MIT-style license that can be found in the LICENSE file or at
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:async/async.dart';
import 'package:test/test.dart';
import 'package:sass_embedded/src/embedded_sass.pb.dart';
import 'package:sass_embedded/src/util/length_delimited_transformer.dart';
/// A wrapper for [Process] that provides a convenient API for testing the
/// embedded Sass process.
/// If the test fails, this will automatically print out any stderr and protocol
/// buffers from the process to aid debugging.
/// This API is based on the `test_process` package.
class EmbeddedProcess {
/// The underlying process.
final Process _process;
/// A [StreamQueue] that emits each outbound protocol buffer from the process.
StreamQueue<OutboundMessage> get outbound => _outbound;
StreamQueue<OutboundMessage> _outbound;
/// A [StreamQueue] that emits each line of stderr from the process.
StreamQueue<String> get stderr => _stderr;
StreamQueue<String> _stderr;
/// A splitter that can emit new copies of [outbound].
final StreamSplitter<OutboundMessage> _outboundSplitter;
/// A splitter that can emit new copies of [stderr].
final StreamSplitter<String> _stderrSplitter;
/// A sink into which inbound messages can be passed to the process.
final Sink<InboundMessage> inbound;
/// The raw standard input byte sink.
IOSink get stdin => _process.stdin;
/// A log that includes lines from [stderr] and human-friendly serializations
/// of protocol buffers from [outbound]
final _log = <String>[];
/// Whether [_log] has been passed to [printOnFailure] yet.
bool _loggedOutput = false;
/// Returns a [Future] which completes to the exit code of the process, once
/// it completes.
Future<int> get exitCode => _process.exitCode;
/// The process ID of the process.
int get pid =>;
/// Completes to [_process]'s exit code if it's exited, otherwise completes to
/// `null` immediately.
Future<int> get _exitCodeOrNull async =>
await exitCode.timeout(, onTimeout: () => null);
/// Starts a process.
/// [executable], [workingDirectory], [environment],
/// [includeParentEnvironment], and [runInShell] have the same meaning as for
/// [Process.start].
/// If [forwardOutput] is `true`, the process's [outbound] messages and
/// [stderr] will be printed to the console as they appear. This is only
/// intended to be set temporarily to help when debugging test failures.
static Future<EmbeddedProcess> start(
{String workingDirectory,
Map<String, String> environment,
bool includeParentEnvironment = true,
bool runInShell = false,
bool forwardOutput = false}) async {
// TODO(nweiz): Support running from a native executable.
var process = await Process.start(
Platform.executable, ["bin/dart_sass_embedded.dart"],
workingDirectory: workingDirectory,
environment: environment,
includeParentEnvironment: includeParentEnvironment,
runInShell: runInShell);
return EmbeddedProcess._(process, forwardOutput: forwardOutput);
/// Creates a [EmbeddedProcess] for [process].
/// The [forwardOutput] argument is the same as that to [start].
EmbeddedProcess._(Process process, {bool forwardOutput = false})
: _process = process,
_outboundSplitter = StreamSplitter(process.stdout
.map((message) => OutboundMessage.fromBuffer(message))),
_stderrSplitter = StreamSplitter(process.stderr
.transform(const LineSplitter())),
inbound = StreamSinkTransformer<InboundMessage, List<int>>.fromHandlers(
handleData: (message, sink) =>
.bind(process.stdin)) {
expect(_process.exitCode.then((_) => _logOutput()), completes,
reason: "Process `dart_sass_embedded` never exited.");
_outbound = StreamQueue(_outboundSplitter.split());
_stderr = StreamQueue(_stderrSplitter.split());
_outboundSplitter.split().listen((message) {
for (var line in message.toDebugString().split("\n")) {
if (forwardOutput) print(line);
_log.add(" $line");
_stderrSplitter.split().listen((line) {
if (forwardOutput) print(line);
_log.add("[e] $line");
/// A callback that's run when the test completes.
Future _tearDown() async {
// If the process is already dead, do nothing.
if (await _exitCodeOrNull != null) return;
// Log output now rather than waiting for the exitCode callback so that
// it's visible even if we time out waiting for the process to die.
await _logOutput();
/// Formats the contents of [_log] and passes them to [printOnFailure].
Future _logOutput() async {
if (_loggedOutput) return;
_loggedOutput = true;
var exitCodeOrNull = await _exitCodeOrNull;
// Wait a timer tick to ensure that all available lines have been flushed to
// [_log].
await Future.delayed(;
var buffer = StringBuffer();
buffer.write("Process `dart_sass_embedded` ");
if (exitCodeOrNull == null) {
buffer.write("was killed with SIGKILL in a tear-down.");
} else {
buffer.write("exited with exitCode $exitCodeOrNull.");
buffer.writeln(" Output:");
/// Kills the process (with SIGKILL on POSIX operating systems), and returns a
/// future that completes once it's dead.
/// If this is called after the process is already dead, it does nothing.
Future kill() async {
await exitCode;
/// Waits for the process to exit, and verifies that the exit code matches
/// [expectedExitCode] (if given).
/// If this is called after the process is already dead, it verifies its
/// existing exit code.
Future shouldExit([expectedExitCode]) async {
var exitCode = await this.exitCode;
if (expectedExitCode == null) return;
expect(exitCode, expectedExitCode,
reason: "Process `dart_sass_embedded` had an unexpected exit code.");

test/protocol_test.dart Normal file
View File

@ -0,0 +1,47 @@
// Copyright 2019 Google Inc. Use of this source code is governed by an
// MIT-style license that can be found in the LICENSE file or at
import 'package:test/test.dart';
import 'package:sass_embedded/src/embedded_sass.pb.dart';
import 'embedded_process.dart';
import 'utils.dart';
void main() {
EmbeddedProcess process;
setUp(() async {
process = await EmbeddedProcess.start();
group("gracefully handles a protocol error", () {
test("caused by an empty message", () async {
await expectParseError(process, "InboundMessage.message is not set.");
await process.kill();
test("caused by an invalid message", () async {
process.stdin.add([1, 0, 0, 0, 0]);
await expectParseError(
process, "Protocol message contained an invalid tag (zero).");
await process.kill();
test("without shutting down the compiler", () async {
await expectParseError(process, "InboundMessage.message is not set.");
process.inbound.add(compileString("a {b: c}"));
await expectLater(process.outbound, emits(isSuccess("a { b: c; }")));
await process.kill();
test("compiles a CSS from a string", () async {
process.inbound.add(compileString("a {b: c}"));
await expectLater(process.outbound, emits(isSuccess("a { b: c; }")));
await process.kill();

test/utils.dart Normal file
View File

@ -0,0 +1,61 @@
// Copyright 2019 Google Inc. Use of this source code is governed by an
// MIT-style license that can be found in the LICENSE file or at
import 'package:test/test.dart';
import 'package:sass_embedded/src/embedded_sass.pb.dart';
import 'embedded_process.dart';
/// Returns a [InboundMessage] that compiles the given plain CSS
/// string.
InboundMessage compileString(String css) => InboundMessage()
..compileRequest = (InboundMessage_CompileRequest()
..string = (InboundMessage_CompileRequest_StringInput()..source = css));
/// Asserts that [process] emits a [ProtocolError] parse error with the given
/// [message] on its protobuf stream and prints a notice on stderr.
Future<void> expectParseError(EmbeddedProcess process, message) async {
await expectLater(process.outbound,
emits(isProtocolError(-1, ProtocolError_ErrorType.PARSE, message)));
await expectLater(process.stderr, emits("Host caused parse error: $message"));
/// Asserts that an [OutboundMessage] is a [ProtocolError] with the given [id],
/// [type], and optionally [message].
Matcher isProtocolError(int id, ProtocolError_ErrorType type, [message]) =>
predicate((value) {
expect(value, isA<OutboundMessage>());
var outboundMessage = value as OutboundMessage;
expect(outboundMessage.hasError(), isTrue,
reason: "Expected $message to be a ProtocolError");
expect(, equals(id));
expect(outboundMessage.error.type, equals(type));
if (message != null) expect(outboundMessage.error.message, message);
return true;
/// Asserts that [message] is an [OutboundMessage] with a [CompileResponse] and
/// returns it.
OutboundMessage_CompileResponse getCompileResponse(value) {
expect(value, isA<OutboundMessage>());
var message = value as OutboundMessage;
expect(message.hasCompileResponse(), isTrue,
reason: "Expected $message to have a CompileResponse");
return message.compileResponse;
/// Asserts that an [OutboundMessage] is a [CompileResponse] with CSS that
/// matches [css].
/// If [css] is a [String], this automatically wraps it in
/// [equalsIgnoringWhitespace].
Matcher isSuccess(css) => predicate((value) {
var response = getCompileResponse(value);
expect(response.hasSuccess(), isTrue,
reason: "Expected $response to be successful");
css is String ? equalsIgnoringWhitespace(css) : css);
return true;