Skip to content

Commit cfa7637

Browse files
committed
Refactor RateLimitedScheduler to share pool with advisories download
1 parent 375cfee commit cfa7637

3 files changed

Lines changed: 32 additions & 33 deletions

File tree

lib/src/rate_limited_scheduler.dart

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,9 @@ class RateLimitedScheduler<J, V> {
6060
/// Jobs that have started running.
6161
final Set<J> _started = {};
6262

63-
RateLimitedScheduler(
64-
Future<V> Function(J) runJob, {
65-
required int maxConcurrentOperations,
66-
}) : _runJob = runJob,
67-
_pool = Pool(maxConcurrentOperations);
63+
RateLimitedScheduler(Future<V> Function(J) runJob, {required Pool pool})
64+
: _runJob = runJob,
65+
_pool = pool;
6866

6967
/// Pick the next task in [_queue] and run it.
7068
///

lib/src/source/hosted.dart

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -582,23 +582,21 @@ class HostedSource extends CachedSource {
582582
final Map<String, dynamic> body;
583583
final List<Advisory>? result;
584584
try {
585-
bodyText = await cache.hostedCache.advisoriesDownloadPool.withResource(
586-
() async {
587-
return await withAuthenticatedClient(cache, Uri.parse(hostedUrl), (
588-
client,
589-
) async {
590-
return await retryForHttp(
591-
'fetching advisories for "$packageName" from "$url"',
592-
() async {
593-
final request = http.Request('GET', url);
594-
request.attachPubApiHeaders();
595-
final response = await client.fetch(request);
596-
return response.body;
597-
},
598-
);
599-
});
600-
},
601-
);
585+
bodyText = await cache.hostedCache.pool.withResource(() async {
586+
return await withAuthenticatedClient(cache, Uri.parse(hostedUrl), (
587+
client,
588+
) async {
589+
return await retryForHttp(
590+
'fetching advisories for "$packageName" from "$url"',
591+
() async {
592+
final request = http.Request('GET', url);
593+
request.attachPubApiHeaders();
594+
final response = await client.fetch(request);
595+
return response.body;
596+
},
597+
);
598+
});
599+
});
602600
final decoded = jsonDecode(bodyText);
603601
if (decoded is! Map<String, dynamic>) {
604602
throw const FormatException('security advisories must be a mapping');
@@ -2177,6 +2175,9 @@ int? _parseCrc32c(Map<String, String> headers, String fileName) {
21772175

21782176
/// State that is cached for the HostedSource.
21792177
final class HostedSourceCache {
2178+
/// A pool to rate-limit concurrent network requests to pub.dev.
2179+
final Pool pool;
2180+
21802181
/// The scheduler used to fetch version information for packages.
21812182
///
21822183
/// This allows rate-limiting requests and speculative pre-fetching of
@@ -2196,12 +2197,11 @@ final class HostedSourceCache {
21962197
/// An in-memory cache to store the futures of fetched security advisories.
21972198
final Map<PackageId, Future<List<Advisory>?>> _advisoriesCache = {};
21982199

2199-
/// A pool to rate-limit concurrent security advisory network downloads.
2200-
final Pool advisoriesDownloadPool = Pool(8);
2200+
HostedSourceCache(HostedSource hosted) : this._(hosted, Pool(10));
22012201

2202-
HostedSourceCache(HostedSource hosted)
2202+
HostedSourceCache._(HostedSource hosted, this.pool)
22032203
: scheduler = RateLimitedScheduler(
22042204
(HostedRefAndCache refAndCache) => hosted.fetchVersions(refAndCache),
2205-
maxConcurrentOperations: 10,
2205+
pool: pool,
22062206
);
22072207
}

test/rate_limited_scheduler_test.dart

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import 'dart:async';
66

7+
import 'package:pool/pool.dart';
78
import 'package:pub/src/rate_limited_scheduler.dart';
89
import 'package:test/test.dart';
910

@@ -24,7 +25,7 @@ void main() {
2425
return i.toUpperCase();
2526
}
2627

27-
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
28+
final scheduler = RateLimitedScheduler(f, pool: Pool(2));
2829
await scheduler.withPrescheduling((preschedule) async {
2930
preschedule('a');
3031
preschedule('b');
@@ -53,7 +54,7 @@ void main() {
5354
return i.toUpperCase();
5455
}
5556

56-
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
57+
final scheduler = RateLimitedScheduler(f, pool: Pool(1));
5758

5859
await scheduler.withPrescheduling((preschedule1) async {
5960
await scheduler.withPrescheduling((preschedule2) async {
@@ -88,7 +89,7 @@ void main() {
8889
return i.toUpperCase();
8990
}
9091

91-
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
92+
final scheduler = RateLimitedScheduler(f, pool: Pool(1));
9293

9394
Future? b;
9495
await scheduler.withPrescheduling((preschedule) async {
@@ -118,7 +119,7 @@ void main() {
118119
return i.toUpperCase();
119120
}
120121

121-
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
122+
final scheduler = RateLimitedScheduler(f, pool: Pool(2));
122123

123124
completers['a']!.complete();
124125
expect(await scheduler.schedule('a'), 'A');
@@ -136,7 +137,7 @@ void main() {
136137
return i.toUpperCase();
137138
}
138139

139-
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
140+
final scheduler = RateLimitedScheduler(f, pool: Pool(1));
140141
await scheduler.withPrescheduling((preschedule) async {
141142
preschedule('a');
142143
preschedule('b');
@@ -161,7 +162,7 @@ void main() {
161162
return i.toUpperCase();
162163
}
163164

164-
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
165+
final scheduler = RateLimitedScheduler(f, pool: Pool(2));
165166

166167
await scheduler.withPrescheduling((preschedule) async {
167168
preschedule('a');
@@ -192,7 +193,7 @@ void main() {
192193
return Zone.current['zoneValue'] as String?;
193194
}
194195

195-
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
196+
final scheduler = RateLimitedScheduler(f, pool: Pool(2));
196197
await scheduler.withPrescheduling((preschedule) async {
197198
runZoned(() {
198199
preschedule('a');

0 commit comments

Comments
 (0)