Skip to content

Commit cecd4ed

Browse files
authored
Handle backpressure earlier in pipeline (#2371)
1 parent 7954fb3 commit cecd4ed

File tree

5 files changed

+125
-41
lines changed

5 files changed

+125
-41
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
### Enhancements
2828

2929
- Cache parsed DSN ([#2365](https://github.com/getsentry/sentry-dart/pull/2365))
30-
30+
- Handle backpressure earlier in pipeline ([#2371](https://github.com/getsentry/sentry-dart/pull/2371))
31+
- Drops max un-awaited parallel tasks earlier, so event processors & callbacks are not executed for them.
32+
- Change by setting `SentryOptions.maxQueueSize`. Default is 30.
33+
3134
## 8.10.0-beta.2
3235

3336
### Fixes

dart/lib/src/sentry.dart

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import 'sentry_options.dart';
2424
import 'sentry_user_feedback.dart';
2525
import 'tracing.dart';
2626
import 'sentry_attachment/sentry_attachment.dart';
27+
import 'transport/data_category.dart';
28+
import 'transport/task_queue.dart';
2729

2830
/// Configuration options callback
2931
typedef OptionsConfiguration = FutureOr<void> Function(SentryOptions);
@@ -34,6 +36,7 @@ typedef AppRunner = FutureOr<void> Function();
3436
/// Sentry SDK main entry point
3537
class Sentry {
3638
static Hub _hub = NoOpHub();
39+
static TaskQueue<SentryId> _taskQueue = NoOpTaskQueue();
3740

3841
Sentry._();
3942

@@ -56,6 +59,11 @@ class Sentry {
5659
if (config is Future) {
5760
await config;
5861
}
62+
_taskQueue = DefaultTaskQueue<SentryId>(
63+
sentryOptions.maxQueueSize,
64+
sentryOptions.logger,
65+
sentryOptions.recorder,
66+
);
5967
} catch (exception, stackTrace) {
6068
sentryOptions.logger(
6169
SentryLevel.error,
@@ -181,12 +189,17 @@ class Sentry {
181189
Hint? hint,
182190
ScopeCallback? withScope,
183191
}) =>
184-
_hub.captureEvent(
185-
event,
186-
stackTrace: stackTrace,
187-
hint: hint,
188-
withScope: withScope,
189-
);
192+
_taskQueue.enqueue(
193+
() => _hub.captureEvent(
194+
event,
195+
stackTrace: stackTrace,
196+
hint: hint,
197+
withScope: withScope,
198+
),
199+
SentryId.empty(),
200+
event.type != null
201+
? DataCategory.fromItemType(event.type!)
202+
: DataCategory.unknown);
190203

191204
/// Reports the [throwable] and optionally its [stackTrace] to Sentry.io.
192205
static Future<SentryId> captureException(
@@ -195,11 +208,15 @@ class Sentry {
195208
Hint? hint,
196209
ScopeCallback? withScope,
197210
}) =>
198-
_hub.captureException(
199-
throwable,
200-
stackTrace: stackTrace,
201-
hint: hint,
202-
withScope: withScope,
211+
_taskQueue.enqueue(
212+
() => _hub.captureException(
213+
throwable,
214+
stackTrace: stackTrace,
215+
hint: hint,
216+
withScope: withScope,
217+
),
218+
SentryId.empty(),
219+
DataCategory.error,
203220
);
204221

205222
/// Reports a [message] to Sentry.io.
@@ -211,13 +228,17 @@ class Sentry {
211228
Hint? hint,
212229
ScopeCallback? withScope,
213230
}) =>
214-
_hub.captureMessage(
215-
message,
216-
level: level,
217-
template: template,
218-
params: params,
219-
hint: hint,
220-
withScope: withScope,
231+
_taskQueue.enqueue(
232+
() => _hub.captureMessage(
233+
message,
234+
level: level,
235+
template: template,
236+
params: params,
237+
hint: hint,
238+
withScope: withScope,
239+
),
240+
SentryId.empty(),
241+
DataCategory.unknown,
221242
);
222243

223244
/// Reports a [userFeedback] to Sentry.io.
@@ -236,7 +257,15 @@ class Sentry {
236257
Hint? hint,
237258
ScopeCallback? withScope,
238259
}) =>
239-
_hub.captureFeedback(feedback, hint: hint, withScope: withScope);
260+
_taskQueue.enqueue(
261+
() => _hub.captureFeedback(
262+
feedback,
263+
hint: hint,
264+
withScope: withScope,
265+
),
266+
SentryId.empty(),
267+
DataCategory.unknown,
268+
);
240269

241270
/// Close the client SDK
242271
static Future<void> close() async {
@@ -251,7 +280,7 @@ class Sentry {
251280
/// Last event id recorded by the current Hub
252281
static SentryId get lastEventId => _hub.lastEventId;
253282

254-
/// Adds a breacrumb to the current Scope
283+
/// Adds a breadcrumb to the current Scope
255284
static Future<void> addBreadcrumb(Breadcrumb crumb, {Hint? hint}) =>
256285
_hub.addBreadcrumb(crumb, hint: hint);
257286

dart/lib/src/sentry_client.dart

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import 'transport/http_transport.dart';
2525
import 'transport/noop_transport.dart';
2626
import 'transport/rate_limiter.dart';
2727
import 'transport/spotlight_http_transport.dart';
28-
import 'transport/task_queue.dart';
2928
import 'utils/isolate_utils.dart';
3029
import 'utils/regex_utils.dart';
3130
import 'utils/stacktrace_utils.dart';
@@ -39,10 +38,6 @@ const _defaultIpAddress = '{{auto}}';
3938
/// Logs crash reports and events to the Sentry.io service.
4039
class SentryClient {
4140
final SentryOptions _options;
42-
late final _taskQueue = TaskQueue<SentryId?>(
43-
_options.maxQueueSize,
44-
_options.logger,
45-
);
4641

4742
final Random? _random;
4843

@@ -630,9 +625,6 @@ class SentryClient {
630625
Future<SentryId?> _attachClientReportsAndSend(SentryEnvelope envelope) {
631626
final clientReport = _options.recorder.flush();
632627
envelope.addClientReport(clientReport);
633-
return _taskQueue.enqueue(
634-
() => _options.transport.send(envelope),
635-
SentryId.empty(),
636-
);
628+
return _options.transport.send(envelope);
637629
}
638630
}
Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,41 @@
11
import 'dart:async';
22

3+
import 'package:meta/meta.dart';
4+
35
import '../../sentry.dart';
6+
import '../client_reports/client_report_recorder.dart';
7+
import '../client_reports/discard_reason.dart';
8+
import 'data_category.dart';
49

510
typedef Task<T> = Future<T> Function();
611

7-
class TaskQueue<T> {
8-
TaskQueue(this._maxQueueSize, this._logger);
12+
@internal
13+
abstract class TaskQueue<T> {
14+
Future<T> enqueue(Task<T> task, T fallbackResult, DataCategory category);
15+
}
16+
17+
@internal
18+
class DefaultTaskQueue<T> implements TaskQueue<T> {
19+
DefaultTaskQueue(this._maxQueueSize, this._logger, this._recorder);
920

1021
final int _maxQueueSize;
1122
final SentryLogger _logger;
23+
final ClientReportRecorder _recorder;
1224

1325
int _queueCount = 0;
1426

15-
Future<T> enqueue(Task<T> task, T fallbackResult) async {
27+
@override
28+
Future<T> enqueue(
29+
Task<T> task,
30+
T fallbackResult,
31+
DataCategory category,
32+
) async {
1633
if (_queueCount >= _maxQueueSize) {
17-
_logger(SentryLevel.warning,
18-
'Task dropped due to backpressure. Avoid capturing in a tight loop.');
34+
_recorder.recordLostEvent(DiscardReason.queueOverflow, category);
35+
_logger(
36+
SentryLevel.warning,
37+
'Task dropped due to reaching max ($_maxQueueSize} parallel tasks.).',
38+
);
1939
return fallbackResult;
2040
} else {
2141
_queueCount++;
@@ -27,3 +47,15 @@ class TaskQueue<T> {
2747
}
2848
}
2949
}
50+
51+
@internal
52+
class NoOpTaskQueue<T> implements TaskQueue<T> {
53+
@override
54+
Future<T> enqueue(
55+
Task<T> task,
56+
T fallbackResult,
57+
DataCategory category,
58+
) {
59+
return task();
60+
}
61+
}

dart/test/transport/tesk_queue_test.dart

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import 'dart:async';
22

3+
import 'package:sentry/src/client_reports/discard_reason.dart';
4+
import 'package:sentry/src/transport/data_category.dart';
35
import 'package:sentry/src/transport/task_queue.dart';
46
import 'package:test/test.dart';
57

8+
import '../mocks/mock_client_report_recorder.dart';
69
import '../test_utils.dart';
710

811
void main() {
@@ -25,7 +28,7 @@ void main() {
2528
await Future.delayed(Duration(milliseconds: 1));
2629
completedTasks += 1;
2730
return 1 + 1;
28-
}, -1));
31+
}, -1, DataCategory.error));
2932
}
3033

3134
// This will always await the other futures, even if they are running longer, as it was scheduled after them.
@@ -48,7 +51,7 @@ void main() {
4851
print('Completed task $i');
4952
completedTasks += 1;
5053
return 1 + 1;
51-
}, -1));
54+
}, -1, DataCategory.error));
5255
}
5356

5457
print('Started waiting for first 5 tasks');
@@ -62,7 +65,7 @@ void main() {
6265
print('Completed task $i');
6366
completedTasks += 1;
6467
return 1 + 1;
65-
}, -1));
68+
}, -1, DataCategory.error));
6669
}
6770

6871
print('Started waiting for second 5 tasks');
@@ -83,7 +86,7 @@ void main() {
8386
await Future.delayed(Duration(milliseconds: 1));
8487
completedTasks += 1;
8588
return 1 + 1;
86-
}, -1);
89+
}, -1, DataCategory.error);
8790
}
8891
expect(completedTasks, 10);
8992
});
@@ -98,20 +101,45 @@ void main() {
98101
await sut.enqueue(() async {
99102
completedTasks += 1;
100103
throw Error();
101-
}, -1);
104+
}, -1, DataCategory.error);
102105
} catch (_) {
103106
// Ignore
104107
}
105108
}
106109
expect(completedTasks, 10);
107110
});
111+
112+
test('recording dropped event when category set', () async {
113+
final sut = fixture.getSut(maxQueueSize: 5);
114+
115+
for (int i = 0; i < 10; i++) {
116+
unawaited(sut.enqueue(() async {
117+
print('Task $i');
118+
return 1 + 1;
119+
}, -1, DataCategory.error));
120+
}
121+
122+
// This will always await the other futures, even if they are running longer, as it was scheduled after them.
123+
print('Started waiting for first 5 tasks');
124+
await Future.delayed(Duration(milliseconds: 1));
125+
print('Stopped waiting for first 5 tasks');
126+
127+
expect(fixture.clientReportRecorder.discardedEvents.length, 5);
128+
for (final event in fixture.clientReportRecorder.discardedEvents) {
129+
expect(event.reason, DiscardReason.queueOverflow);
130+
expect(event.category, DataCategory.error);
131+
expect(event.quantity, 1);
132+
}
133+
});
108134
});
109135
}
110136

111137
class Fixture {
112138
final options = defaultTestOptions();
113139

140+
late var clientReportRecorder = MockClientReportRecorder();
141+
114142
TaskQueue<int> getSut({required int maxQueueSize}) {
115-
return TaskQueue(maxQueueSize, options.logger);
143+
return DefaultTaskQueue(maxQueueSize, options.logger, clientReportRecorder);
116144
}
117145
}

0 commit comments

Comments
 (0)