From 391a942dfa8c547c4d8d02bfcb30cc36c536e875 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Mon, 18 May 2026 11:54:01 +0000 Subject: [PATCH 01/13] Parallelize security advisories and package status fetching --- lib/src/command/outdated.dart | 10 +++++++++- lib/src/solver/report.dart | 6 ++++++ lib/src/source/hosted.dart | 17 ++++++++--------- lib/src/system_cache.dart | 24 ++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/lib/src/command/outdated.dart b/lib/src/command/outdated.dart index 91afe727e80..8dfd59feff5 100644 --- a/lib/src/command/outdated.dart +++ b/lib/src/command/outdated.dart @@ -352,14 +352,22 @@ Consider using the Dart 2.19 sdk to migrate to null safety.'''); ), }; // Add all dependencies from the lockfile. + final idsToAnalyze = []; for (final id in [ ...currentPackages, ...upgradablePackages, ...resolvablePackages, ]) { if (!visited.add(id.name)) continue; - rows.add(await analyzeDependency(id.toRef())); + idsToAnalyze.add(id.toRef()); } + await cache.prefetchAdvisoriesAndStatus([ + ...currentPackages, + ...upgradablePackages, + ...resolvablePackages, + ]); + + rows.addAll(await Future.wait(idsToAnalyze.map(analyzeDependency))); if (!includeDevDependencies) { rows.removeWhere((r) => r.kind == _DependencyKind.dev); diff --git a/lib/src/solver/report.dart b/lib/src/solver/report.dart index 60c246a416a..8ab1aaa5650 100644 --- a/lib/src/solver/report.dart +++ b/lib/src/solver/report.dart @@ -216,6 +216,12 @@ $contentHashesDocumentationUrl final names = _newLockFile.packages.keys.toList(); names.remove(_rootPubspec.name); names.sort(); + if (_type != SolveType.downgrade) { + await _cache.prefetchAdvisoriesAndStatus( + names.map((name) => _newLockFile.packages[name]).nonNulls, + ); + } + var changes = 0; for (final name in names) { changes += await _reportPackage(name, output) ? 1 : 0; diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart index 4696a86eaea..370cc84bf2d 100644 --- a/lib/src/source/hosted.dart +++ b/lib/src/source/hosted.dart @@ -792,14 +792,7 @@ class HostedSource extends CachedSource { final parsedCacheAdvisoriesUpdated = DateTime.parse( cachedAdvisoriesUpdated, ); - final advisoriesUpdated = - (await status(id.toRef(), id.version, cache)).advisoriesUpdated; - - if ( - // We could not obtain the timestamp of latest advisory update. - advisoriesUpdated == null || - // The cached entry is too old. - advisoriesUpdated.isAfter(parsedCacheAdvisoriesUpdated)) { + if (advisoriesUpdated.isAfter(parsedCacheAdvisoriesUpdated)) { tryDeleteEntry(advisoriesCachePath); } else { return _extractAdvisoryDetailsForPackage(doc, id.toRef().name); @@ -828,6 +821,9 @@ class HostedSource extends CachedSource { final Map)> _responseCache = {}; + /// An in-memory cache to store the futures of fetched security advisories. + final Map?>> _advisoriesCache = {}; + /// If a cached version listing response for [ref] exists on disk and is less /// than [maxAge] old it is parsed and returned. /// @@ -1097,7 +1093,10 @@ class HostedSource extends CachedSource { SystemCache cache, Duration? maxAge, ) { - return _getAdvisories(id, cache, maxAge); + return _advisoriesCache.putIfAbsent( + id, + () => _getAdvisories(id, cache, maxAge), + ); } @override diff --git a/lib/src/system_cache.dart b/lib/src/system_cache.dart index 58a53ad3a5c..3e38e3a163f 100644 --- a/lib/src/system_cache.dart +++ b/lib/src/system_cache.dart @@ -170,6 +170,30 @@ Consider setting the `PUB_CACHE` variable manually. return pubspec; } + /// Prefetches the status and advisories for [packages] in parallel. + Future prefetchAdvisoriesAndStatus(Iterable packages) async { + final futures = >[]; + for (final id in packages) { + futures.add( + id.source.status( + id.toRef(), + id.version, + this, + maxAge: const Duration(days: 3), + ), + ); + final advisoriesFuture = id.source.getAdvisoriesForPackageVersion( + id, + this, + const Duration(days: 3), + ); + if (advisoriesFuture != null) { + futures.add(advisoriesFuture); + } + } + await Future.wait(futures); + } + /// Get the IDs of all versions that match [ref]. /// /// Note that this does *not* require the packages to be downloaded locally, From 4f636780d655eef9615d6a78156aae4457a455cf Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Mon, 18 May 2026 12:00:26 +0000 Subject: [PATCH 02/13] Fix clock-skew/timezone-sensitive cache validation in HostedSource._getAdvisories --- lib/src/source/hosted.dart | 5 ----- tool/test.dart | 1 - 2 files changed, 6 deletions(-) diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart index 370cc84bf2d..aa166d5732d 100644 --- a/lib/src/source/hosted.dart +++ b/lib/src/source/hosted.dart @@ -774,11 +774,6 @@ class HostedSource extends CachedSource { final stat = io.File(advisoriesCachePath).statSync(); if (stat.type == io.FileSystemEntityType.file) { - if (advisoriesUpdated.isAfter(stat.modified)) { - tryDeleteEntry(advisoriesCachePath); - return null; - } - try { final doc = jsonDecode(readTextFile(advisoriesCachePath)); if (doc is! Map) { diff --git a/tool/test.dart b/tool/test.dart index 98468b9943d..700a2b6ba94 100755 --- a/tool/test.dart +++ b/tool/test.dart @@ -1,5 +1,4 @@ #!/usr/bin/env -S dart run -r - // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. From 59321187840113ee4358064c5dbc9f7388dea080 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Mon, 18 May 2026 12:05:14 +0000 Subject: [PATCH 03/13] Add clock-skew and timezone safe stat.modified cache check with a 24-hour buffer --- lib/src/source/hosted.dart | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart index aa166d5732d..2b0c77228ea 100644 --- a/lib/src/source/hosted.dart +++ b/lib/src/source/hosted.dart @@ -774,6 +774,16 @@ class HostedSource extends CachedSource { final stat = io.File(advisoriesCachePath).statSync(); if (stat.type == io.FileSystemEntityType.file) { + // To tolerate timezone differences and clock skew between the local + // machine and the pub.dev server, we add a 24-hour buffer to + // stat.modified. + if (advisoriesUpdated.isAfter( + stat.modified.add(const Duration(hours: 24)), + )) { + tryDeleteEntry(advisoriesCachePath); + return null; + } + try { final doc = jsonDecode(readTextFile(advisoriesCachePath)); if (doc is! Map) { From 927d22c7a2ddd2e6cbf256e0a8152258f82c6883 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Mon, 18 May 2026 13:32:47 +0000 Subject: [PATCH 04/13] Add integration test validating that subsequent pub get fetches no advisories from network --- test/get/hosted/advisory_test.dart | 32 ++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/test/get/hosted/advisory_test.dart b/test/get/hosted/advisory_test.dart index 08288b51aba..b27f20a6795 100644 --- a/test/get/hosted/advisory_test.dart +++ b/test/get/hosted/advisory_test.dart @@ -489,4 +489,36 @@ Future main() async { File(p.join(d.sandbox, appPath, 'pubspec.lock')).deleteSync(); await pubGet(args: ['--offline']); }); + + test('subsequent pub get reads advisories and status from cache ' + 'without network calls', () async { + final server = await servePackages(); + server.serve('foo', '1.2.3'); + + await d.dir(appPath, [ + d.pubspec({ + 'name': 'app', + 'dependencies': {'foo': '^1.0.0'}, + }), + ]).create(); + + server.addAdvisory( + advisoryId: '123', + displayUrl: 'https://github.com/advisories/123', + affectedPackages: [ + AffectedPackage(name: 'foo', versions: ['1.2.3']), + ], + ); + + // First fetch: populates the cache + await pubGet(); + + // Configure server to fail on any subsequent HTTP requests + server.serveErrors(); + + // Second fetch: should read everything from cache and make 0 network + // requests (even with a stable lockfile, pub get still validates + // security advisories). + await pubGet(); + }); } From d51ddc04a5202c4697f7edd3c515328376980e0b Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Mon, 18 May 2026 13:58:44 +0000 Subject: [PATCH 05/13] Fix formatting discrepancy in tool/test.dart for Dart SDK dev channel --- tool/test.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/tool/test.dart b/tool/test.dart index 700a2b6ba94..98468b9943d 100755 --- a/tool/test.dart +++ b/tool/test.dart @@ -1,4 +1,5 @@ #!/usr/bin/env -S dart run -r + // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. From 2084a14a1ba722be157289e0146ced51aeda03c0 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Tue, 19 May 2026 07:24:24 +0000 Subject: [PATCH 06/13] Refactor integration test to be highly specific to new resolution and status cache bypass --- test/get/hosted/advisory_test.dart | 52 ++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/test/get/hosted/advisory_test.dart b/test/get/hosted/advisory_test.dart index b27f20a6795..26261888f7f 100644 --- a/test/get/hosted/advisory_test.dart +++ b/test/get/hosted/advisory_test.dart @@ -5,6 +5,7 @@ @TestOn('vm') library; +import 'dart:convert'; import 'dart:io'; import 'package:pub/src/path.dart'; @@ -490,8 +491,8 @@ Future main() async { await pubGet(args: ['--offline']); }); - test('subsequent pub get reads advisories and status from cache ' - 'without network calls', () async { + test('subsequent pub get reads advisories and status from cache on a new ' + 'resolution without network calls for advisories', () async { final server = await servePackages(); server.serve('foo', '1.2.3'); @@ -513,12 +514,49 @@ Future main() async { // First fetch: populates the cache await pubGet(); - // Configure server to fail on any subsequent HTTP requests - server.serveErrors(); + // Configure only the advisories endpoint to return an error + server.handle( + '/api/packages/foo/advisories', + (request) => Response.internalServerError(), + ); + + // Configure version listing to succeed only once (for version solving) + // and fail on any subsequent status checks (SolveReport cache bypass) + var fooRequestCount = 0; + final sha256 = await server.peekArchiveSha256('foo', '1.2.3'); + server.handle(RegExp(r'/api/packages/foo$'), (request) { + fooRequestCount++; + if (fooRequestCount > 1) { + return Response.internalServerError(); + } + return Response.ok( + jsonEncode({ + 'name': 'foo', + 'uploaders': ['nweiz@google.com'], + 'versions': [ + { + 'pubspec': { + 'name': 'foo', + 'version': '1.2.3', + 'environment': {'sdk': '^3.0.0'}, + }, + 'version': '1.2.3', + 'archive_url': '${server.url}/packages/foo/versions/1.2.3.tar.gz', + 'archive_sha256': sha256, + }, + ], + 'advisoriesUpdated': '1970-01-01T00:00:00.000', + }), + headers: {HttpHeaders.contentTypeHeader: server.contentType}, + ); + }); + + // Delete the lockfile to force a fresh resolution / version solving + File(p.join(d.sandbox, appPath, 'pubspec.lock')).deleteSync(); - // Second fetch: should read everything from cache and make 0 network - // requests (even with a stable lockfile, pub get still validates - // security advisories). + // Second fetch: a new resolution is triggered, version listing is fetched + // from the server (once), but SolveReport must read both status and + // advisories from the disk cache. await pubGet(); }); } From 1c6d4cae22515be75f8dc662066897f8a06235bc Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Tue, 19 May 2026 07:25:41 +0000 Subject: [PATCH 07/13] Refactor integration test to use fail() in mock handlers and output asserts --- test/get/hosted/advisory_test.dart | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/test/get/hosted/advisory_test.dart b/test/get/hosted/advisory_test.dart index 26261888f7f..05d8b7ecbaa 100644 --- a/test/get/hosted/advisory_test.dart +++ b/test/get/hosted/advisory_test.dart @@ -512,13 +512,12 @@ Future main() async { ); // First fetch: populates the cache - await pubGet(); + await pubGet(output: contains('affected by advisory')); // Configure only the advisories endpoint to return an error - server.handle( - '/api/packages/foo/advisories', - (request) => Response.internalServerError(), - ); + server.handle('/api/packages/foo/advisories', (request) { + fail('Should not fetch advisories from network!'); + }); // Configure version listing to succeed only once (for version solving) // and fail on any subsequent status checks (SolveReport cache bypass) @@ -527,7 +526,7 @@ Future main() async { server.handle(RegExp(r'/api/packages/foo$'), (request) { fooRequestCount++; if (fooRequestCount > 1) { - return Response.internalServerError(); + fail('Should not fetch version listing status a second time!'); } return Response.ok( jsonEncode({ @@ -557,6 +556,6 @@ Future main() async { // Second fetch: a new resolution is triggered, version listing is fetched // from the server (once), but SolveReport must read both status and // advisories from the disk cache. - await pubGet(); + await pubGet(output: contains('affected by advisory')); }); } From 939286f5dbb3e8bad8c41eb3a45f915eb2009dc1 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Tue, 19 May 2026 07:30:26 +0000 Subject: [PATCH 08/13] Update integration test to simplified version --- test/get/hosted/advisory_test.dart | 38 ++++-------------------------- 1 file changed, 4 insertions(+), 34 deletions(-) diff --git a/test/get/hosted/advisory_test.dart b/test/get/hosted/advisory_test.dart index 05d8b7ecbaa..5343fbdffd9 100644 --- a/test/get/hosted/advisory_test.dart +++ b/test/get/hosted/advisory_test.dart @@ -515,40 +515,10 @@ Future main() async { await pubGet(output: contains('affected by advisory')); // Configure only the advisories endpoint to return an error - server.handle('/api/packages/foo/advisories', (request) { - fail('Should not fetch advisories from network!'); - }); - - // Configure version listing to succeed only once (for version solving) - // and fail on any subsequent status checks (SolveReport cache bypass) - var fooRequestCount = 0; - final sha256 = await server.peekArchiveSha256('foo', '1.2.3'); - server.handle(RegExp(r'/api/packages/foo$'), (request) { - fooRequestCount++; - if (fooRequestCount > 1) { - fail('Should not fetch version listing status a second time!'); - } - return Response.ok( - jsonEncode({ - 'name': 'foo', - 'uploaders': ['nweiz@google.com'], - 'versions': [ - { - 'pubspec': { - 'name': 'foo', - 'version': '1.2.3', - 'environment': {'sdk': '^3.0.0'}, - }, - 'version': '1.2.3', - 'archive_url': '${server.url}/packages/foo/versions/1.2.3.tar.gz', - 'archive_sha256': sha256, - }, - ], - 'advisoriesUpdated': '1970-01-01T00:00:00.000', - }), - headers: {HttpHeaders.contentTypeHeader: server.contentType}, - ); - }); + server.handle( + '/api/packages/foo/advisories', + (request) => Response.internalServerError(), + ); // Delete the lockfile to force a fresh resolution / version solving File(p.join(d.sandbox, appPath, 'pubspec.lock')).deleteSync(); From 1d0926003615a2fed2e26aedca6d3afd539d4ac5 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Tue, 19 May 2026 09:13:30 +0000 Subject: [PATCH 09/13] Update integration test to final simplified stable-lockfile version --- test/get/hosted/advisory_test.dart | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/test/get/hosted/advisory_test.dart b/test/get/hosted/advisory_test.dart index 5343fbdffd9..383824d5ef2 100644 --- a/test/get/hosted/advisory_test.dart +++ b/test/get/hosted/advisory_test.dart @@ -5,7 +5,6 @@ @TestOn('vm') library; -import 'dart:convert'; import 'dart:io'; import 'package:pub/src/path.dart'; @@ -514,18 +513,21 @@ Future main() async { // First fetch: populates the cache await pubGet(output: contains('affected by advisory')); - // Configure only the advisories endpoint to return an error + // Mock both endpoints to return 500 Internal Server Error. + // If the client tries to hit the network for status or advisories, + // it will fail and not report the advisory. server.handle( '/api/packages/foo/advisories', (request) => Response.internalServerError(), ); + server.handle( + '/api/packages/foo', + (request) => Response.internalServerError(), + ); - // Delete the lockfile to force a fresh resolution / version solving - File(p.join(d.sandbox, appPath, 'pubspec.lock')).deleteSync(); - - // Second fetch: a new resolution is triggered, version listing is fetched - // from the server (once), but SolveReport must read both status and - // advisories from the disk cache. + // Second fetch: the resolution is up to date, and therefore reused. + // It should read advisories from disk cache and print + // the advisory warning, making zero network requests. await pubGet(output: contains('affected by advisory')); }); } From 3d7847df3890d3bdd075df28ed3dc9a7558b4ab9 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Tue, 19 May 2026 09:19:22 +0000 Subject: [PATCH 10/13] Encapsulate responseCache and advisoriesCache inside HostedSourceCache --- lib/src/source/hosted.dart | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart index 2b0c77228ea..b4b09aa45c3 100644 --- a/lib/src/source/hosted.dart +++ b/lib/src/source/hosted.dart @@ -817,18 +817,6 @@ class HostedSource extends CachedSource { await _fetchAdvisories(id.toRef(), cache); } - /// An in-memory cache to store the cached version listing loaded from - /// [_versionListingCachePath]. - /// - /// Invariant: Entries in this cache are the parsed version of the exact same - /// information cached on disk. I.e. if the entry is present in this cache, - /// there will not be a newer version on disk. - final Map)> _responseCache = - {}; - - /// An in-memory cache to store the futures of fetched security advisories. - final Map?>> _advisoriesCache = {}; - /// If a cached version listing response for [ref] exists on disk and is less /// than [maxAge] old it is parsed and returned. /// @@ -841,7 +829,7 @@ class HostedSource extends CachedSource { SystemCache cache, { Duration? maxAge, }) async { - final cachedInfo = _responseCache[ref]; + final cachedInfo = cache.hostedCache._responseCache[ref]; if (cachedInfo != null) { final (cacheTimestamp, versionInfo) = cachedInfo; final cacheAge = DateTime.now().difference(cacheTimestamp); @@ -880,7 +868,7 @@ class HostedSource extends CachedSource { Uri.file(cachePath), cache, ); - _responseCache[ref] = (parsedTimestamp, res); + cache.hostedCache._responseCache[ref] = (parsedTimestamp, res); return res; } } on io.IOException { @@ -932,7 +920,7 @@ class HostedSource extends CachedSource { ); // Delete the entry in the in-memory cache to maintain the invariant that // cached information in memory is the same as that on the disk. - _responseCache.remove(ref); + cache.hostedCache._responseCache.remove(ref); } on io.IOException catch (e) { // Not being able to write this cache is not fatal. Just move on... log.fine('Failed writing cache file. $e'); @@ -1098,7 +1086,7 @@ class HostedSource extends CachedSource { SystemCache cache, Duration? maxAge, ) { - return _advisoriesCache.putIfAbsent( + return cache.hostedCache._advisoriesCache.putIfAbsent( id, () => _getAdvisories(id, cache, maxAge), ); @@ -2192,6 +2180,18 @@ final class HostedSourceCache { final RateLimitedScheduler> scheduler; + /// An in-memory cache to store the cached version listing loaded from + /// [_versionListingCachePath]. + /// + /// Invariant: Entries in this cache are the parsed version of the exact same + /// information cached on disk. I.e. if the entry is present in this cache, + /// there will not be a newer version on disk. + final Map)> _responseCache = + {}; + + /// An in-memory cache to store the futures of fetched security advisories. + final Map?>> _advisoriesCache = {}; + HostedSourceCache(HostedSource hosted) : scheduler = RateLimitedScheduler( (HostedRefAndCache refAndCache) => hosted.fetchVersions(refAndCache), From 4efbcedbee31e85f385b929d4141ea4b52095183 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Tue, 19 May 2026 09:30:38 +0000 Subject: [PATCH 11/13] Update comments to final version --- lib/src/source/hosted.dart | 2 +- test/get/hosted/advisory_test.dart | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart index b4b09aa45c3..bf1f378ee39 100644 --- a/lib/src/source/hosted.dart +++ b/lib/src/source/hosted.dart @@ -2181,7 +2181,7 @@ final class HostedSourceCache { scheduler; /// An in-memory cache to store the cached version listing loaded from - /// [_versionListingCachePath]. + /// [HostedSource._versionListingCachePath]. /// /// Invariant: Entries in this cache are the parsed version of the exact same /// information cached on disk. I.e. if the entry is present in this cache, diff --git a/test/get/hosted/advisory_test.dart b/test/get/hosted/advisory_test.dart index 383824d5ef2..42f498e5979 100644 --- a/test/get/hosted/advisory_test.dart +++ b/test/get/hosted/advisory_test.dart @@ -525,7 +525,7 @@ Future main() async { (request) => Response.internalServerError(), ); - // Second fetch: the resolution is up to date, and therefore reused. + // Second fetch: the resolution is up to date, and therefore reused. // It should read advisories from disk cache and print // the advisory warning, making zero network requests. await pubGet(output: contains('affected by advisory')); From 375cfee4f768e0e4e000fc9bf9ce63eca5090312 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Fri, 22 May 2026 07:33:42 +0000 Subject: [PATCH 12/13] Limit concurrent security advisory network downloads using Pool --- lib/src/source/hosted.dart | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart index bf1f378ee39..b74184401ff 100644 --- a/lib/src/source/hosted.dart +++ b/lib/src/source/hosted.dart @@ -582,19 +582,23 @@ class HostedSource extends CachedSource { final Map body; final List? result; try { - bodyText = await withAuthenticatedClient(cache, Uri.parse(hostedUrl), ( - client, - ) async { - return await retryForHttp( - 'fetching advisories for "$packageName" from "$url"', - () async { - final request = http.Request('GET', url); - request.attachPubApiHeaders(); - final response = await client.fetch(request); - return response.body; - }, - ); - }); + bodyText = await cache.hostedCache.advisoriesDownloadPool.withResource( + () async { + return await withAuthenticatedClient(cache, Uri.parse(hostedUrl), ( + client, + ) async { + return await retryForHttp( + 'fetching advisories for "$packageName" from "$url"', + () async { + final request = http.Request('GET', url); + request.attachPubApiHeaders(); + final response = await client.fetch(request); + return response.body; + }, + ); + }); + }, + ); final decoded = jsonDecode(bodyText); if (decoded is! Map) { throw const FormatException('security advisories must be a mapping'); @@ -2192,6 +2196,9 @@ final class HostedSourceCache { /// An in-memory cache to store the futures of fetched security advisories. final Map?>> _advisoriesCache = {}; + /// A pool to rate-limit concurrent security advisory network downloads. + final Pool advisoriesDownloadPool = Pool(8); + HostedSourceCache(HostedSource hosted) : scheduler = RateLimitedScheduler( (HostedRefAndCache refAndCache) => hosted.fetchVersions(refAndCache), From cfa7637ea6ec21260a3aa41e3eb8425df7b06ba0 Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Fri, 22 May 2026 07:38:52 +0000 Subject: [PATCH 13/13] Refactor RateLimitedScheduler to share pool with advisories download --- lib/src/rate_limited_scheduler.dart | 8 ++--- lib/src/source/hosted.dart | 42 +++++++++++++-------------- test/rate_limited_scheduler_test.dart | 15 +++++----- 3 files changed, 32 insertions(+), 33 deletions(-) diff --git a/lib/src/rate_limited_scheduler.dart b/lib/src/rate_limited_scheduler.dart index d0efaed2f3f..adc6bb953af 100644 --- a/lib/src/rate_limited_scheduler.dart +++ b/lib/src/rate_limited_scheduler.dart @@ -60,11 +60,9 @@ class RateLimitedScheduler { /// Jobs that have started running. final Set _started = {}; - RateLimitedScheduler( - Future Function(J) runJob, { - required int maxConcurrentOperations, - }) : _runJob = runJob, - _pool = Pool(maxConcurrentOperations); + RateLimitedScheduler(Future Function(J) runJob, {required Pool pool}) + : _runJob = runJob, + _pool = pool; /// Pick the next task in [_queue] and run it. /// diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart index b74184401ff..e6e3bdb2257 100644 --- a/lib/src/source/hosted.dart +++ b/lib/src/source/hosted.dart @@ -582,23 +582,21 @@ class HostedSource extends CachedSource { final Map body; final List? result; try { - bodyText = await cache.hostedCache.advisoriesDownloadPool.withResource( - () async { - return await withAuthenticatedClient(cache, Uri.parse(hostedUrl), ( - client, - ) async { - return await retryForHttp( - 'fetching advisories for "$packageName" from "$url"', - () async { - final request = http.Request('GET', url); - request.attachPubApiHeaders(); - final response = await client.fetch(request); - return response.body; - }, - ); - }); - }, - ); + bodyText = await cache.hostedCache.pool.withResource(() async { + return await withAuthenticatedClient(cache, Uri.parse(hostedUrl), ( + client, + ) async { + return await retryForHttp( + 'fetching advisories for "$packageName" from "$url"', + () async { + final request = http.Request('GET', url); + request.attachPubApiHeaders(); + final response = await client.fetch(request); + return response.body; + }, + ); + }); + }); final decoded = jsonDecode(bodyText); if (decoded is! Map) { throw const FormatException('security advisories must be a mapping'); @@ -2177,6 +2175,9 @@ int? _parseCrc32c(Map headers, String fileName) { /// State that is cached for the HostedSource. final class HostedSourceCache { + /// A pool to rate-limit concurrent network requests to pub.dev. + final Pool pool; + /// The scheduler used to fetch version information for packages. /// /// This allows rate-limiting requests and speculative pre-fetching of @@ -2196,12 +2197,11 @@ final class HostedSourceCache { /// An in-memory cache to store the futures of fetched security advisories. final Map?>> _advisoriesCache = {}; - /// A pool to rate-limit concurrent security advisory network downloads. - final Pool advisoriesDownloadPool = Pool(8); + HostedSourceCache(HostedSource hosted) : this._(hosted, Pool(10)); - HostedSourceCache(HostedSource hosted) + HostedSourceCache._(HostedSource hosted, this.pool) : scheduler = RateLimitedScheduler( (HostedRefAndCache refAndCache) => hosted.fetchVersions(refAndCache), - maxConcurrentOperations: 10, + pool: pool, ); } diff --git a/test/rate_limited_scheduler_test.dart b/test/rate_limited_scheduler_test.dart index d2a88c875b6..b15d1afc6e1 100644 --- a/test/rate_limited_scheduler_test.dart +++ b/test/rate_limited_scheduler_test.dart @@ -4,6 +4,7 @@ import 'dart:async'; +import 'package:pool/pool.dart'; import 'package:pub/src/rate_limited_scheduler.dart'; import 'package:test/test.dart'; @@ -24,7 +25,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + final scheduler = RateLimitedScheduler(f, pool: Pool(2)); await scheduler.withPrescheduling((preschedule) async { preschedule('a'); preschedule('b'); @@ -53,7 +54,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1); + final scheduler = RateLimitedScheduler(f, pool: Pool(1)); await scheduler.withPrescheduling((preschedule1) async { await scheduler.withPrescheduling((preschedule2) async { @@ -88,7 +89,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1); + final scheduler = RateLimitedScheduler(f, pool: Pool(1)); Future? b; await scheduler.withPrescheduling((preschedule) async { @@ -118,7 +119,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + final scheduler = RateLimitedScheduler(f, pool: Pool(2)); completers['a']!.complete(); expect(await scheduler.schedule('a'), 'A'); @@ -136,7 +137,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1); + final scheduler = RateLimitedScheduler(f, pool: Pool(1)); await scheduler.withPrescheduling((preschedule) async { preschedule('a'); preschedule('b'); @@ -161,7 +162,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + final scheduler = RateLimitedScheduler(f, pool: Pool(2)); await scheduler.withPrescheduling((preschedule) async { preschedule('a'); @@ -192,7 +193,7 @@ void main() { return Zone.current['zoneValue'] as String?; } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + final scheduler = RateLimitedScheduler(f, pool: Pool(2)); await scheduler.withPrescheduling((preschedule) async { runZoned(() { preschedule('a');