diff --git a/lib/src/command/outdated.dart b/lib/src/command/outdated.dart index 91afe727e80..8dfd59feff5 100644 --- a/lib/src/command/outdated.dart +++ b/lib/src/command/outdated.dart @@ -352,14 +352,22 @@ Consider using the Dart 2.19 sdk to migrate to null safety.'''); ), }; // Add all dependencies from the lockfile. + final idsToAnalyze = []; for (final id in [ ...currentPackages, ...upgradablePackages, ...resolvablePackages, ]) { if (!visited.add(id.name)) continue; - rows.add(await analyzeDependency(id.toRef())); + idsToAnalyze.add(id.toRef()); } + await cache.prefetchAdvisoriesAndStatus([ + ...currentPackages, + ...upgradablePackages, + ...resolvablePackages, + ]); + + rows.addAll(await Future.wait(idsToAnalyze.map(analyzeDependency))); if (!includeDevDependencies) { rows.removeWhere((r) => r.kind == _DependencyKind.dev); diff --git a/lib/src/rate_limited_scheduler.dart b/lib/src/rate_limited_scheduler.dart index d0efaed2f3f..adc6bb953af 100644 --- a/lib/src/rate_limited_scheduler.dart +++ b/lib/src/rate_limited_scheduler.dart @@ -60,11 +60,9 @@ class RateLimitedScheduler { /// Jobs that have started running. final Set _started = {}; - RateLimitedScheduler( - Future Function(J) runJob, { - required int maxConcurrentOperations, - }) : _runJob = runJob, - _pool = Pool(maxConcurrentOperations); + RateLimitedScheduler(Future Function(J) runJob, {required Pool pool}) + : _runJob = runJob, + _pool = pool; /// Pick the next task in [_queue] and run it. /// diff --git a/lib/src/solver/report.dart b/lib/src/solver/report.dart index 60c246a416a..8ab1aaa5650 100644 --- a/lib/src/solver/report.dart +++ b/lib/src/solver/report.dart @@ -216,6 +216,12 @@ $contentHashesDocumentationUrl final names = _newLockFile.packages.keys.toList(); names.remove(_rootPubspec.name); names.sort(); + if (_type != SolveType.downgrade) { + await _cache.prefetchAdvisoriesAndStatus( + names.map((name) => _newLockFile.packages[name]).nonNulls, + ); + } + var changes = 0; for (final name in names) { changes += await _reportPackage(name, output) ? 1 : 0; diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart index 4696a86eaea..e6e3bdb2257 100644 --- a/lib/src/source/hosted.dart +++ b/lib/src/source/hosted.dart @@ -582,18 +582,20 @@ class HostedSource extends CachedSource { final Map body; final List? result; try { - bodyText = await withAuthenticatedClient(cache, Uri.parse(hostedUrl), ( - client, - ) async { - return await retryForHttp( - 'fetching advisories for "$packageName" from "$url"', - () async { - final request = http.Request('GET', url); - request.attachPubApiHeaders(); - final response = await client.fetch(request); - return response.body; - }, - ); + bodyText = await cache.hostedCache.pool.withResource(() async { + return await withAuthenticatedClient(cache, Uri.parse(hostedUrl), ( + client, + ) async { + return await retryForHttp( + 'fetching advisories for "$packageName" from "$url"', + () async { + final request = http.Request('GET', url); + request.attachPubApiHeaders(); + final response = await client.fetch(request); + return response.body; + }, + ); + }); }); final decoded = jsonDecode(bodyText); if (decoded is! Map) { @@ -774,7 +776,12 @@ class HostedSource extends CachedSource { final stat = io.File(advisoriesCachePath).statSync(); if (stat.type == io.FileSystemEntityType.file) { - if (advisoriesUpdated.isAfter(stat.modified)) { + // To tolerate timezone differences and clock skew between the local + // machine and the pub.dev server, we add a 24-hour buffer to + // stat.modified. + if (advisoriesUpdated.isAfter( + stat.modified.add(const Duration(hours: 24)), + )) { tryDeleteEntry(advisoriesCachePath); return null; } @@ -792,14 +799,7 @@ class HostedSource extends CachedSource { final parsedCacheAdvisoriesUpdated = DateTime.parse( cachedAdvisoriesUpdated, ); - final advisoriesUpdated = - (await status(id.toRef(), id.version, cache)).advisoriesUpdated; - - if ( - // We could not obtain the timestamp of latest advisory update. - advisoriesUpdated == null || - // The cached entry is too old. - advisoriesUpdated.isAfter(parsedCacheAdvisoriesUpdated)) { + if (advisoriesUpdated.isAfter(parsedCacheAdvisoriesUpdated)) { tryDeleteEntry(advisoriesCachePath); } else { return _extractAdvisoryDetailsForPackage(doc, id.toRef().name); @@ -819,15 +819,6 @@ class HostedSource extends CachedSource { await _fetchAdvisories(id.toRef(), cache); } - /// An in-memory cache to store the cached version listing loaded from - /// [_versionListingCachePath]. - /// - /// Invariant: Entries in this cache are the parsed version of the exact same - /// information cached on disk. I.e. if the entry is present in this cache, - /// there will not be a newer version on disk. - final Map)> _responseCache = - {}; - /// If a cached version listing response for [ref] exists on disk and is less /// than [maxAge] old it is parsed and returned. /// @@ -840,7 +831,7 @@ class HostedSource extends CachedSource { SystemCache cache, { Duration? maxAge, }) async { - final cachedInfo = _responseCache[ref]; + final cachedInfo = cache.hostedCache._responseCache[ref]; if (cachedInfo != null) { final (cacheTimestamp, versionInfo) = cachedInfo; final cacheAge = DateTime.now().difference(cacheTimestamp); @@ -879,7 +870,7 @@ class HostedSource extends CachedSource { Uri.file(cachePath), cache, ); - _responseCache[ref] = (parsedTimestamp, res); + cache.hostedCache._responseCache[ref] = (parsedTimestamp, res); return res; } } on io.IOException { @@ -931,7 +922,7 @@ class HostedSource extends CachedSource { ); // Delete the entry in the in-memory cache to maintain the invariant that // cached information in memory is the same as that on the disk. - _responseCache.remove(ref); + cache.hostedCache._responseCache.remove(ref); } on io.IOException catch (e) { // Not being able to write this cache is not fatal. Just move on... log.fine('Failed writing cache file. $e'); @@ -1097,7 +1088,10 @@ class HostedSource extends CachedSource { SystemCache cache, Duration? maxAge, ) { - return _getAdvisories(id, cache, maxAge); + return cache.hostedCache._advisoriesCache.putIfAbsent( + id, + () => _getAdvisories(id, cache, maxAge), + ); } @override @@ -2181,6 +2175,9 @@ int? _parseCrc32c(Map headers, String fileName) { /// State that is cached for the HostedSource. final class HostedSourceCache { + /// A pool to rate-limit concurrent network requests to pub.dev. + final Pool pool; + /// The scheduler used to fetch version information for packages. /// /// This allows rate-limiting requests and speculative pre-fetching of @@ -2188,9 +2185,23 @@ final class HostedSourceCache { final RateLimitedScheduler> scheduler; - HostedSourceCache(HostedSource hosted) + /// An in-memory cache to store the cached version listing loaded from + /// [HostedSource._versionListingCachePath]. + /// + /// Invariant: Entries in this cache are the parsed version of the exact same + /// information cached on disk. I.e. if the entry is present in this cache, + /// there will not be a newer version on disk. + final Map)> _responseCache = + {}; + + /// An in-memory cache to store the futures of fetched security advisories. + final Map?>> _advisoriesCache = {}; + + HostedSourceCache(HostedSource hosted) : this._(hosted, Pool(10)); + + HostedSourceCache._(HostedSource hosted, this.pool) : scheduler = RateLimitedScheduler( (HostedRefAndCache refAndCache) => hosted.fetchVersions(refAndCache), - maxConcurrentOperations: 10, + pool: pool, ); } diff --git a/lib/src/system_cache.dart b/lib/src/system_cache.dart index 58a53ad3a5c..3e38e3a163f 100644 --- a/lib/src/system_cache.dart +++ b/lib/src/system_cache.dart @@ -170,6 +170,30 @@ Consider setting the `PUB_CACHE` variable manually. return pubspec; } + /// Prefetches the status and advisories for [packages] in parallel. + Future prefetchAdvisoriesAndStatus(Iterable packages) async { + final futures = >[]; + for (final id in packages) { + futures.add( + id.source.status( + id.toRef(), + id.version, + this, + maxAge: const Duration(days: 3), + ), + ); + final advisoriesFuture = id.source.getAdvisoriesForPackageVersion( + id, + this, + const Duration(days: 3), + ); + if (advisoriesFuture != null) { + futures.add(advisoriesFuture); + } + } + await Future.wait(futures); + } + /// Get the IDs of all versions that match [ref]. /// /// Note that this does *not* require the packages to be downloaded locally, diff --git a/test/get/hosted/advisory_test.dart b/test/get/hosted/advisory_test.dart index 08288b51aba..42f498e5979 100644 --- a/test/get/hosted/advisory_test.dart +++ b/test/get/hosted/advisory_test.dart @@ -489,4 +489,45 @@ Future main() async { File(p.join(d.sandbox, appPath, 'pubspec.lock')).deleteSync(); await pubGet(args: ['--offline']); }); + + test('subsequent pub get reads advisories and status from cache on a new ' + 'resolution without network calls for advisories', () async { + final server = await servePackages(); + server.serve('foo', '1.2.3'); + + await d.dir(appPath, [ + d.pubspec({ + 'name': 'app', + 'dependencies': {'foo': '^1.0.0'}, + }), + ]).create(); + + server.addAdvisory( + advisoryId: '123', + displayUrl: 'https://github.com/advisories/123', + affectedPackages: [ + AffectedPackage(name: 'foo', versions: ['1.2.3']), + ], + ); + + // First fetch: populates the cache + await pubGet(output: contains('affected by advisory')); + + // Mock both endpoints to return 500 Internal Server Error. + // If the client tries to hit the network for status or advisories, + // it will fail and not report the advisory. + server.handle( + '/api/packages/foo/advisories', + (request) => Response.internalServerError(), + ); + server.handle( + '/api/packages/foo', + (request) => Response.internalServerError(), + ); + + // Second fetch: the resolution is up to date, and therefore reused. + // It should read advisories from disk cache and print + // the advisory warning, making zero network requests. + await pubGet(output: contains('affected by advisory')); + }); } diff --git a/test/rate_limited_scheduler_test.dart b/test/rate_limited_scheduler_test.dart index d2a88c875b6..b15d1afc6e1 100644 --- a/test/rate_limited_scheduler_test.dart +++ b/test/rate_limited_scheduler_test.dart @@ -4,6 +4,7 @@ import 'dart:async'; +import 'package:pool/pool.dart'; import 'package:pub/src/rate_limited_scheduler.dart'; import 'package:test/test.dart'; @@ -24,7 +25,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + final scheduler = RateLimitedScheduler(f, pool: Pool(2)); await scheduler.withPrescheduling((preschedule) async { preschedule('a'); preschedule('b'); @@ -53,7 +54,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1); + final scheduler = RateLimitedScheduler(f, pool: Pool(1)); await scheduler.withPrescheduling((preschedule1) async { await scheduler.withPrescheduling((preschedule2) async { @@ -88,7 +89,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1); + final scheduler = RateLimitedScheduler(f, pool: Pool(1)); Future? b; await scheduler.withPrescheduling((preschedule) async { @@ -118,7 +119,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + final scheduler = RateLimitedScheduler(f, pool: Pool(2)); completers['a']!.complete(); expect(await scheduler.schedule('a'), 'A'); @@ -136,7 +137,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1); + final scheduler = RateLimitedScheduler(f, pool: Pool(1)); await scheduler.withPrescheduling((preschedule) async { preschedule('a'); preschedule('b'); @@ -161,7 +162,7 @@ void main() { return i.toUpperCase(); } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + final scheduler = RateLimitedScheduler(f, pool: Pool(2)); await scheduler.withPrescheduling((preschedule) async { preschedule('a'); @@ -192,7 +193,7 @@ void main() { return Zone.current['zoneValue'] as String?; } - final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + final scheduler = RateLimitedScheduler(f, pool: Pool(2)); await scheduler.withPrescheduling((preschedule) async { runZoned(() { preschedule('a');