Skip to content

RSDK-6515 Add dial timeout and stats #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 12, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions lib/src/rpc/dial.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class DialOptions {

/// Whether the connection was made using mDNS
bool _usingMdns = false;

/// Timeout is the timeout for dial.
Duration timeout = Duration(seconds: 10);
}

/// The credentials used for connecting to the robot
Expand Down Expand Up @@ -110,10 +113,12 @@ class DialWebRtcOptions {

/// Connect to a robot at the provided address with the given options
Future<ClientChannelBase> dial(String address, DialOptions? options, String Function() sessionCallback) async {
final dialSW = Stopwatch()..start();
_logger.i('Connecting to address $address');
final opts = options ?? DialOptions();

if (opts.attemptMdns) {
final mdnsSW = Stopwatch()..start();
try {
final mdnsUri = await _searchMdns(address);
// Let downstream calls know when mdns was used. This is helpful to inform
Expand All @@ -129,16 +134,23 @@ Future<ClientChannelBase> dial(String address, DialOptions? options, String Func
} catch (e) {
_logger.d('Error dialing with mDNS; falling back to other methods', error: e);
}
mdnsSW.stop();
_logger.d('STATS: mDNS discovery took ${mdnsSW.elapsed}');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would love to capture this data somehow so we could historically track.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah NetCode will be working on that eventually (gathering this data and tracking it to get a sense of how our connection establishment performance looks over time). For now, I think we're ok just debug-logging a number of measurements in this dial code and in other connection establishment paths.

}

bool disableWebRtc = opts.webRtcOptions?.disable ?? false;
if (address.contains('.local.') || address.contains('localhost')) {
disableWebRtc = true;
}
final Future<ClientChannelBase> chan;
if (disableWebRtc) {
return _dialDirectGrpc(address, opts, sessionCallback);
chan = _dialDirectGrpc(address, opts, sessionCallback);
} else {
chan = _dialWebRtc(address, opts, sessionCallback);
}
return _dialWebRtc(address, opts, sessionCallback);
dialSW.stop();
_logger.d('STATS: dial function took ${dialSW.elapsed}');
return chan.timeout(opts.timeout);
}

Future<String> _searchMdns(String address) async {
Expand Down Expand Up @@ -189,6 +201,7 @@ Future<ClientChannelBase> _dialDirectGrpc(String address, DialOptions options, S
}

Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, String Function() sessionCallback) async {
final Stopwatch webrtcDialSW = Stopwatch()..start();
_logger.d('Dialing WebRTC to $address');
if (options.authEntity.isNullOrEmpty) {
if (options.externalAuthAddress.isNullOrEmpty) {
Expand All @@ -201,9 +214,11 @@ Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, Strin
}

final signalingServer = options.webRtcOptions?.signalingServerAddress ?? ((options._usingMdns) ? address : 'app.viam.com');
final sigServerSW = Stopwatch()..start();
_logger.d('Connecting to signaling server: $signalingServer');
final signalingChannel = await _dialDirectGrpc(signalingServer, options, sessionCallback);
_logger.d('Connected to signaling server: $signalingServer');
sigServerSW.stop();
_logger.d('STATS: connected to signaling in ${sigServerSW.elapsed}');
final signalingClient = SignalingServiceClient(signalingChannel, options: CallOptions(metadata: {'rpc-host': address}));
WebRTCConfig config;
try {
Expand All @@ -224,6 +239,7 @@ Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, Strin
'sdpSemantics': 'unified-plan',
});

final createPeerConnSW = Stopwatch()..start();
final peerConnection = await createPeerConnection({'iceServers': iceServers});
final dataChannel = await peerConnection.createDataChannel(
'data',
Expand All @@ -242,11 +258,28 @@ Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, Strin
..negotiated = true
..ordered = true,
);
createPeerConnSW.stop();
_logger.d('STATS: created peer connection and channels in ${createPeerConnSW.elapsed}');

String? uuid;
final didConnect = Completer();
final didSetRemoteDesc = Completer();

// updateCalls keeps track of how many times we've sent an update the
// signaling server. callUpdateDuration keeps track of the total amount of
// time we've spent sending caller updates to the signaling server. Report
// these values after the ICE connection state becomes 'completed'.
int updateCalls = 0;
Duration callUpdateDuration = Duration();
peerConnection.onIceConnectionState = (RTCIceConnectionState state) async {
if (state == RTCIceConnectionState.RTCIceConnectionStateCompleted) {
webrtcDialSW.stop();
_logger.d('STATS: WebRTC dialing took ${webrtcDialSW.elapsed}');
_logger.d('STATS: $updateCalls call updates to the signaling server were made');
_logger.d('STATS: spent $callUpdateDuration making call updates to the signaling server');
}
};

// If trickleICE is enabled, set onIceCandidate handler
if (!(options.webRtcOptions?.disableTrickleIce ?? config.disableTrickle)) {
final offer = await peerConnection.createOffer({});
Expand All @@ -272,7 +305,11 @@ Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, Strin
if (uuid != null) {
callUpdateRequest.uuid = uuid!;
}
final Stopwatch stopwatch = Stopwatch()..start();
await signalingClient.callUpdate(callUpdateRequest);
stopwatch.stop();
callUpdateDuration += stopwatch.elapsed;
updateCalls++;
} catch (error, st) {
_logger.e('Update ICECandidate error', error: error, stackTrace: st);
}
Expand Down Expand Up @@ -387,10 +424,13 @@ String _encodeSDPJsonStringToBase64String(String sdp) {
}

Future<AuthenticatedChannel> _authenticatedChannel(String address, DialOptions options, String Function() sessionsCallback) async {
final authSW = Stopwatch()..start();
String accessToken = options.accessToken ?? '';
if (accessToken.isNotEmpty && options.externalAuthAddress.isNullOrEmpty && options.externalAuthToEntity.isNullOrEmpty) {
_logger.d('Received pre-authenticated access token');
final addr = _hostAndPort(address, options.insecure);
authSW.stop();
_logger.d('STATS: authentication (pre-authenticated) took ${authSW.elapsed}');
return AuthenticatedChannel(addr.host, addr.port, accessToken, options.insecure, sessionsCallback);
}

Expand Down Expand Up @@ -439,6 +479,8 @@ Future<AuthenticatedChannel> _authenticatedChannel(String address, DialOptions o
}

final actual = _hostAndPort(address, options.insecure);
authSW.stop();
_logger.d('STATS: authentication took ${authSW.elapsed}');
return AuthenticatedChannel(actual.host, actual.port, accessToken, options.insecure, sessionsCallback);
}

Expand Down