forked from dart-lang/pub
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrate_limited_scheduler.dart
More file actions
152 lines (133 loc) · 4.82 KB
/
rate_limited_scheduler.dart
File metadata and controls
152 lines (133 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'package:pool/pool.dart';
/// Handles rate-limited scheduling of tasks.
///
/// Tasks are identified by a jobId of type [J] (should be useful as a Hash-key)
/// and run with a supplied async function.
///
/// Designed to allow speculatively running tasks that will likely be needed
/// later with [withPrescheduling].
///
/// Errors thrown by tasks scheduled with the `preschedule` callback will only
/// be triggered when you await the [Future] returned by [schedule].
///
/// The operation will run in the [Zone] that the task was in when enqueued.
///
/// If a task if `preschedule`d and later [schedule]d before the operation is
/// started, the task will go in front of the queue with the zone of the
/// [schedule] operation.
///
/// Example:
///
/// ```dart
/// // A scheduler that, given a uri, gets that page and returns the body
/// final scheduler = RateLimitedScheduler(http.read);
///
/// scheduler.withPresceduling((preschedule) {
/// // Start fetching `pub.dev` and `dart.dev` in the background.
/// scheduler.preschedule(Uri.parse('https://pub.dev/'));
/// scheduler.preschedule(Uri.parse('https://dart.dev/'));
/// // ... do time-consuming task.
/// // Now we actually need `pub.dev`.
/// final pubDevBody =
/// await scheduler.schedule(Uri.parse('https://pub.dev/'));
/// // if the `dart.dev` task has not started yet, it will be canceled when
/// // leaving `withPresceduling`.
/// });
/// ```
class RateLimitedScheduler<J, V> {
final Future<V> Function(J) _runJob;
/// The results of ongoing and finished jobs.
final Map<J, Completer<V>> _cache = <J, Completer<V>>{};
/// Provides sync access to completed results.
final Map<J, V> _results = <J, V>{};
/// Tasks that are waiting to be run.
final Queue<_Task<J>> _queue = Queue<_Task<J>>();
/// Rate limits the number of concurrent jobs.
final Pool _pool;
/// Jobs that have started running.
final Set<J> _started = {};
RateLimitedScheduler(Future<V> Function(J) runJob, {required Pool pool})
: _runJob = runJob,
_pool = pool;
/// Pick the next task in [_queue] and run it.
///
/// If the task is already in [_started] it will not be run again.
Future<void> _processNextTask() async {
if (_queue.isEmpty) {
return;
}
final task = _queue.removeFirst();
final completer = _cache[task.jobId]!;
if (!_started.add(task.jobId)) {
return;
}
// Use an async function to catch sync exceptions from _runJob.
Future<V> runJob() async {
return _results[task.jobId] = await task.zone.runUnary(
_runJob,
task.jobId,
);
}
completer.complete(runJob());
// Listen to errors on the completer:
// this will make errors thrown by [_run] not
// become uncaught.
//
// They will still show up for other listeners of the future.
try {
await completer.future;
} catch (_) {}
}
/// Calls [callback] with a function that can pre-schedule jobs.
///
/// When [callback] returns, all jobs that where prescheduled by [callback]
/// that have not started running will be removed from the work queue
/// (if they have been added separately by [schedule] they will still be
/// executed).
Future<R> withPrescheduling<R>(
FutureOr<R> Function(void Function(J) preschedule) callback,
) async {
final prescheduled = <_Task>{};
try {
return await callback((jobId) {
if (_started.contains(jobId)) return;
final task = _Task(jobId, Zone.current);
_cache.putIfAbsent(jobId, Completer.new);
_queue.addLast(task);
prescheduled.add(task);
unawaited(_pool.withResource(_processNextTask));
});
} finally {
_queue.removeWhere(prescheduled.contains);
}
}
/// Returns a future that completed with the result of running [jobId].
///
/// If [jobId] has already run, the cached result will be returned.
/// If [jobId] is not yet running, it will go to the front of the work queue
/// to be scheduled next when there are free resources.
Future<V> schedule(J jobId) {
final completer = _cache.putIfAbsent(jobId, Completer.new);
if (!_started.contains(jobId)) {
final task = _Task(jobId, Zone.current);
_queue.addFirst(task);
scheduleMicrotask(() => _pool.withResource(_processNextTask));
}
return completer.future;
}
/// Returns the result of running [jobId] if that is already done.
/// Otherwise returns `null`.
V? peek(J jobId) => _results[jobId];
}
class _Task<J> {
final J jobId;
final Zone zone;
_Task(this.jobId, this.zone);
@override
String toString() => jobId.toString();
}