Skip to content

Batch evaluation requests when possible #1746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dwds/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
## 16.0.1-dev

- Allow `LoadStrategy.serverPathForModule` and `LoadStrategy.sourceMapPathForModule`
to return `null` and add error handling.
- Allow the following API to return `null` and add error handling:
- `LoadStrategy.serverPathForModule`
- `LoadStrategy.sourceMapPathForModule`
- Expression evaluation performance improvement:
- Batch `ChromeProxyService.evaluate()` requests that are close in time
and are executed in the same library and scope.

## 16.0.0

Expand Down
5 changes: 4 additions & 1 deletion dwds/debug_extension/web/background.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import 'package:dwds/data/devtools_request.dart';
import 'package:dwds/data/extension_request.dart';
import 'package:dwds/data/serializers.dart';
import 'package:dwds/src/sockets.dart';
import 'package:dwds/src/utilities/batched_stream.dart';
// NOTE(annagrin): using 'package:dwds/src/utilities/batched_stream.dart'
// makes dart2js skip creating background.js, so we use a copy instead.
// import 'package:dwds/src/utilities/batched_stream.dart';
import 'package:dwds/src/web_utilities/batched_stream.dart';
import 'package:js/js.dart';
import 'package:js/js_util.dart' as js_util;
import 'package:pub_semver/pub_semver.dart';
Expand Down
7 changes: 6 additions & 1 deletion dwds/lib/src/connections/app_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ class AppConnection {
/// The initial connection request sent from the application in the browser.
final ConnectRequest request;
final _startedCompleter = Completer<void>();
final _doneCompleter = Completer<void>();
final SocketConnection _connection;

AppConnection(this.request, this._connection);
AppConnection(this.request, this._connection) {
unawaited(_connection.sink.done.then((v) => _doneCompleter.complete()));
}

bool get isInKeepAlivePeriod => _connection.isInKeepAlivePeriod;
void shutDown() => _connection.shutdown();
bool get isStarted => _startedCompleter.isCompleted;
Future<void> get onStart => _startedCompleter.future;
bool get isDone => _doneCompleter.isCompleted;
Future<void> get onDone => _doneCompleter.future;

void runMain() {
if (_startedCompleter.isCompleted) {
Expand Down
2 changes: 1 addition & 1 deletion dwds/lib/src/injected/client.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 135 additions & 0 deletions dwds/lib/src/services/batched_expression_evaluator.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:collection/collection.dart';
import 'package:dwds/src/utilities/domain.dart';
import 'package:logging/logging.dart';
import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart';

import '../debugging/debugger.dart';
import '../debugging/location.dart';
import '../debugging/modules.dart';
import '../utilities/batched_stream.dart';
import 'expression_compiler.dart';
import 'expression_evaluator.dart';

class EvaluateRequest {
final String isolateId;
final String? libraryUri;
final String expression;
final Map<String, String>? scope;
final completer = Completer<RemoteObject>();

EvaluateRequest(this.isolateId, this.libraryUri, this.expression, this.scope);
}

class BatchedExpressionEvaluator extends ExpressionEvaluator {
final _logger = Logger('BatchedExpressionEvaluator');
final Debugger _debugger;
final _requestController =
BatchedStreamController<EvaluateRequest>(delay: 200);

BatchedExpressionEvaluator(
String entrypoint,
AppInspectorInterface inspector,
this._debugger,
Locations locations,
Modules modules,
ExpressionCompiler compiler,
) : super(entrypoint, inspector, _debugger, locations, modules, compiler) {
_requestController.stream.listen(_processRequest);
}

@override
void close() {
_logger.fine('Closed');
_requestController.close();
}

@override
Future<RemoteObject> evaluateExpression(
String isolateId,
String? libraryUri,
String expression,
Map<String, String>? scope,
) {
final request = EvaluateRequest(isolateId, libraryUri, expression, scope);
_requestController.sink.add(request);
return request.completer.future;
}

void _processRequest(List<EvaluateRequest> requests) async {
String? libraryUri;
String? isolateId;
Map<String, String>? scope;
List<EvaluateRequest> currentRequests = [];

for (var request in requests) {
libraryUri ??= request.libraryUri;
isolateId ??= request.isolateId;
scope ??= request.scope;

if (libraryUri != request.libraryUri ||
isolateId != request.isolateId ||
!MapEquality().equals(scope, request.scope)) {
_logger.fine('New batch due to');
if (libraryUri != request.libraryUri) {
_logger.fine(' - library uri: $libraryUri != ${request.libraryUri}');
}
if (isolateId != request.isolateId) {
_logger.fine(' - isolateId: $isolateId != ${request.isolateId}');
}
if (!MapEquality().equals(scope, request.scope)) {
_logger.fine(' - scope: $scope != ${request.scope}');
}

unawaited(_evaluateBatch(currentRequests));
currentRequests = [];
libraryUri = request.libraryUri;
isolateId = request.isolateId;
scope = request.scope;
}
currentRequests.add(request);
}
unawaited(_evaluateBatch(currentRequests));
}

Future<void> _evaluateBatch(List<EvaluateRequest> requests) async {
if (requests.isEmpty) return;

final first = requests.first;
if (requests.length == 1) {
if (first.completer.isCompleted) return;
return super
.evaluateExpression(
first.isolateId, first.libraryUri, first.expression, first.scope)
.then(requests.first.completer.complete);
}

final expressions = requests.map((r) => r.expression).join(', ');
final batchedExpression = '[ $expressions ]';

_logger.fine('Evaluating batch of expressions $batchedExpression');

final RemoteObject list = await super.evaluateExpression(
first.isolateId, first.libraryUri, batchedExpression, first.scope);

for (var i = 0; i < requests.length; i++) {
final request = requests[i];
if (request.completer.isCompleted) continue;
_logger.fine('Getting result out of a batch for ${request.expression}');
_debugger
.getProperties(list.objectId!,
offset: i, count: 1, length: requests.length)
.then((v) {
final result = v.first.value;
_logger.fine(
'Got result out of a batch for ${request.expression}: $result');
request.completer.complete(result);
});
}
}
}
7 changes: 6 additions & 1 deletion dwds/lib/src/services/chrome_proxy_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import '../utilities/dart_uri.dart';
import '../utilities/sdk_configuration.dart';
import '../utilities/shared.dart';
import 'expression_evaluator.dart';
import 'batched_expression_evaluator.dart';

/// A proxy from the chrome debug protocol to the dart vm service protocol.
class ChromeProxyService implements VmServiceInterface {
Expand Down Expand Up @@ -241,7 +242,7 @@ class ChromeProxyService implements VmServiceInterface {
final compiler = _compiler;
_expressionEvaluator = compiler == null
? null
: ExpressionEvaluator(
: BatchedExpressionEvaluator(
entrypoint,
inspector,
debugger,
Expand All @@ -259,6 +260,8 @@ class ChromeProxyService implements VmServiceInterface {
_startedCompleter.complete();
}));

unawaited(appConnection.onDone.then((_) => destroyIsolate()));

final isolateRef = inspector.isolateRef;
final timestamp = DateTime.now().millisecondsSinceEpoch;

Expand Down Expand Up @@ -301,6 +304,7 @@ class ChromeProxyService implements VmServiceInterface {
///
/// Clears out the [_inspector] and all related cached information.
void destroyIsolate() {
_logger.fine('Destroying isolate');
if (!_isIsolateRunning) return;
final isolate = inspector.isolate;
final isolateRef = inspector.isolateRef;
Expand All @@ -318,6 +322,7 @@ class ChromeProxyService implements VmServiceInterface {
_inspector = null;
_previousBreakpoints.clear();
_previousBreakpoints.addAll(isolate.breakpoints ?? []);
_expressionEvaluator?.close();
_consoleSubscription?.cancel();
_consoleSubscription = null;
}
Expand Down
4 changes: 4 additions & 0 deletions dwds/lib/src/services/expression_evaluator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:dwds/src/utilities/domain.dart';
import 'package:logging/logging.dart';
import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart';
Expand Down Expand Up @@ -61,6 +63,8 @@ class ExpressionEvaluator {
<String, String>{'type': '$severity', 'value': message});
}

void close() {}

/// Evaluate dart expression inside a given library.
///
/// Uses ExpressionCompiler interface to compile the expression to
Expand Down
85 changes: 85 additions & 0 deletions dwds/lib/src/web_utilities/batched_stream.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'package:async/async.dart';

/// Stream controller allowing to batch events.
class BatchedStreamController<T> {
static const _defaultBatchDelayMilliseconds = 1000;
static const _checkDelayMilliseconds = 100;

final int _batchDelayMilliseconds;

final StreamController<T> _inputController;
late StreamQueue<T> _inputQueue;

final StreamController<List<T>> _outputController;
final Completer<bool> _completer = Completer<bool>();

/// Create batched stream controller.
///
/// Collects events from input [sink] and emits them in batches to the
/// output [stream] every [delay] milliseconds. Keeps the original order.
BatchedStreamController({
int delay = _defaultBatchDelayMilliseconds,
}) : _batchDelayMilliseconds = delay,
_inputController = StreamController<T>(),
_outputController = StreamController<List<T>>() {
_inputQueue = StreamQueue<T>(_inputController.stream);
unawaited(_batchAndSendEvents());
}

/// Sink collecting events.
StreamSink<T> get sink => _inputController.sink;

/// Output stream of batch events.
Stream<List<T>> get stream => _outputController.stream;

/// Close the controller.
Future<dynamic> close() async {
unawaited(_inputController.close());
return _completer.future.then((value) => _outputController.close());
}

/// Send events to the output in a batch every [_batchDelayMilliseconds].
Future<void> _batchAndSendEvents() async {
const duration = Duration(milliseconds: _checkDelayMilliseconds);
final buffer = <T>[];

// Batch events every `_batchDelayMilliseconds`.
//
// Note that events might arrive at random intervals, so collecting
// a predetermined number of events to send in a batch might delay
// the batch indefinitely. Instead, check for new events every
// `_checkDelayMilliseconds` to make sure batches are sent in regular
// intervals.
var lastSendTime = DateTime.now().millisecondsSinceEpoch;
while (await _hasEventOrTimeOut(duration)) {
if (await _hasEventDuring(duration)) {
buffer.add(await _inputQueue.next);
}

final now = DateTime.now().millisecondsSinceEpoch;
if (now > lastSendTime + _batchDelayMilliseconds) {
lastSendTime = now;
if (buffer.isNotEmpty) {
_outputController.sink.add(List.from(buffer));
buffer.clear();
}
}
}

if (buffer.isNotEmpty) {
_outputController.sink.add(List.from(buffer));
}
_completer.complete(true);
}

Future<bool> _hasEventOrTimeOut(Duration duration) =>
_inputQueue.hasNext.timeout(duration, onTimeout: () => true);

Future<bool> _hasEventDuring(Duration duration) =>
_inputQueue.hasNext.timeout(duration, onTimeout: () => false);
}
19 changes: 19 additions & 0 deletions dwds/test/evaluate_common.dart
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,25 @@ void testAll({

tearDown(() async {});

test('in parallel (in a batch)', () async {
final library = isolate.rootLib!;
final evaluation1 = setup.service
.evaluate(isolateId, library.id!, 'MainClass(0).toString()');
final evaluation2 = setup.service
.evaluate(isolateId, library.id!, 'MainClass(1).toString()');

final results = await Future.wait([evaluation1, evaluation2]);
expect(
results[0],
const TypeMatcher<InstanceRef>().having(
(instance) => instance.valueAsString, 'valueAsString', '0'));

expect(
results[1],
const TypeMatcher<InstanceRef>().having(
(instance) => instance.valueAsString, 'valueAsString', '1'));
});

test('with scope override', () async {
final library = isolate.rootLib!;
final object = await setup.service
Expand Down
Loading