Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion lib/src/command/outdated.dart
Original file line number Diff line number Diff line change
Expand Up @@ -352,14 +352,22 @@ Consider using the Dart 2.19 sdk to migrate to null safety.''');
),
};
// Add all dependencies from the lockfile.
final idsToAnalyze = <PackageRef>[];
for (final id in [
...currentPackages,
...upgradablePackages,
...resolvablePackages,
]) {
if (!visited.add(id.name)) continue;
rows.add(await analyzeDependency(id.toRef()));
idsToAnalyze.add(id.toRef());
}
await cache.prefetchAdvisoriesAndStatus([
...currentPackages,
...upgradablePackages,
...resolvablePackages,
]);

rows.addAll(await Future.wait(idsToAnalyze.map(analyzeDependency)));

if (!includeDevDependencies) {
rows.removeWhere((r) => r.kind == _DependencyKind.dev);
Expand Down
8 changes: 3 additions & 5 deletions lib/src/rate_limited_scheduler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ class RateLimitedScheduler<J, V> {
/// Jobs that have started running.
final Set<J> _started = {};

RateLimitedScheduler(
Future<V> Function(J) runJob, {
required int maxConcurrentOperations,
}) : _runJob = runJob,
_pool = Pool(maxConcurrentOperations);
RateLimitedScheduler(Future<V> Function(J) runJob, {required Pool pool})
: _runJob = runJob,
_pool = pool;

/// Pick the next task in [_queue] and run it.
///
Expand Down
6 changes: 6 additions & 0 deletions lib/src/solver/report.dart
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ $contentHashesDocumentationUrl
final names = _newLockFile.packages.keys.toList();
names.remove(_rootPubspec.name);
names.sort();
if (_type != SolveType.downgrade) {
await _cache.prefetchAdvisoriesAndStatus(
names.map((name) => _newLockFile.packages[name]).nonNulls,
);
}

var changes = 0;
for (final name in names) {
changes += await _reportPackage(name, output) ? 1 : 0;
Expand Down
83 changes: 47 additions & 36 deletions lib/src/source/hosted.dart
Original file line number Diff line number Diff line change
Expand Up @@ -582,18 +582,20 @@ class HostedSource extends CachedSource {
final Map<String, dynamic> body;
final List<Advisory>? result;
try {
bodyText = await withAuthenticatedClient(cache, Uri.parse(hostedUrl), (
client,
) async {
return await retryForHttp(
'fetching advisories for "$packageName" from "$url"',
() async {
final request = http.Request('GET', url);
request.attachPubApiHeaders();
final response = await client.fetch(request);
return response.body;
},
);
bodyText = await cache.hostedCache.pool.withResource(() async {
return await withAuthenticatedClient(cache, Uri.parse(hostedUrl), (
client,
) async {
return await retryForHttp(
'fetching advisories for "$packageName" from "$url"',
() async {
final request = http.Request('GET', url);
request.attachPubApiHeaders();
final response = await client.fetch(request);
return response.body;
},
);
});
});
final decoded = jsonDecode(bodyText);
if (decoded is! Map<String, dynamic>) {
Expand Down Expand Up @@ -774,7 +776,12 @@ class HostedSource extends CachedSource {
final stat = io.File(advisoriesCachePath).statSync();

if (stat.type == io.FileSystemEntityType.file) {
if (advisoriesUpdated.isAfter(stat.modified)) {
// To tolerate timezone differences and clock skew between the local
// machine and the pub.dev server, we add a 24-hour buffer to
// stat.modified.
if (advisoriesUpdated.isAfter(
stat.modified.add(const Duration(hours: 24)),
)) {
tryDeleteEntry(advisoriesCachePath);
return null;
}
Expand All @@ -792,14 +799,7 @@ class HostedSource extends CachedSource {
final parsedCacheAdvisoriesUpdated = DateTime.parse(
cachedAdvisoriesUpdated,
);
final advisoriesUpdated =
(await status(id.toRef(), id.version, cache)).advisoriesUpdated;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we were missing the maxAge


if (
// We could not obtain the timestamp of latest advisory update.
advisoriesUpdated == null ||
// The cached entry is too old.
advisoriesUpdated.isAfter(parsedCacheAdvisoriesUpdated)) {
if (advisoriesUpdated.isAfter(parsedCacheAdvisoriesUpdated)) {
tryDeleteEntry(advisoriesCachePath);
} else {
return _extractAdvisoryDetailsForPackage(doc, id.toRef().name);
Expand All @@ -819,15 +819,6 @@ class HostedSource extends CachedSource {
await _fetchAdvisories(id.toRef(), cache);
}

/// An in-memory cache to store the cached version listing loaded from
/// [_versionListingCachePath].
///
/// Invariant: Entries in this cache are the parsed version of the exact same
/// information cached on disk. I.e. if the entry is present in this cache,
/// there will not be a newer version on disk.
final Map<PackageRef, (DateTime, List<HostedVersionInfo>)> _responseCache =
{};

/// If a cached version listing response for [ref] exists on disk and is less
/// than [maxAge] old it is parsed and returned.
///
Expand All @@ -840,7 +831,7 @@ class HostedSource extends CachedSource {
SystemCache cache, {
Duration? maxAge,
}) async {
final cachedInfo = _responseCache[ref];
final cachedInfo = cache.hostedCache._responseCache[ref];
if (cachedInfo != null) {
final (cacheTimestamp, versionInfo) = cachedInfo;
final cacheAge = DateTime.now().difference(cacheTimestamp);
Expand Down Expand Up @@ -879,7 +870,7 @@ class HostedSource extends CachedSource {
Uri.file(cachePath),
cache,
);
_responseCache[ref] = (parsedTimestamp, res);
cache.hostedCache._responseCache[ref] = (parsedTimestamp, res);
return res;
}
} on io.IOException {
Expand Down Expand Up @@ -931,7 +922,7 @@ class HostedSource extends CachedSource {
);
// Delete the entry in the in-memory cache to maintain the invariant that
// cached information in memory is the same as that on the disk.
_responseCache.remove(ref);
cache.hostedCache._responseCache.remove(ref);
} on io.IOException catch (e) {
// Not being able to write this cache is not fatal. Just move on...
log.fine('Failed writing cache file. $e');
Expand Down Expand Up @@ -1097,7 +1088,10 @@ class HostedSource extends CachedSource {
SystemCache cache,
Duration? maxAge,
) {
return _getAdvisories(id, cache, maxAge);
return cache.hostedCache._advisoriesCache.putIfAbsent(
id,
() => _getAdvisories(id, cache, maxAge),
);
}

@override
Expand Down Expand Up @@ -2181,16 +2175,33 @@ int? _parseCrc32c(Map<String, String> headers, String fileName) {

/// State that is cached for the HostedSource.
final class HostedSourceCache {
/// A pool to rate-limit concurrent network requests to pub.dev.
final Pool pool;

/// The scheduler used to fetch version information for packages.
///
/// This allows rate-limiting requests and speculative pre-fetching of
/// dependencies.
final RateLimitedScheduler<HostedRefAndCache, List<HostedVersionInfo>>
scheduler;

HostedSourceCache(HostedSource hosted)
/// An in-memory cache to store the cached version listing loaded from
/// [HostedSource._versionListingCachePath].
///
/// Invariant: Entries in this cache are the parsed version of the exact same
/// information cached on disk. I.e. if the entry is present in this cache,
/// there will not be a newer version on disk.
final Map<PackageRef, (DateTime, List<HostedVersionInfo>)> _responseCache =
{};

/// An in-memory cache to store the futures of fetched security advisories.
final Map<PackageId, Future<List<Advisory>?>> _advisoriesCache = {};

HostedSourceCache(HostedSource hosted) : this._(hosted, Pool(10));

HostedSourceCache._(HostedSource hosted, this.pool)
: scheduler = RateLimitedScheduler(
(HostedRefAndCache refAndCache) => hosted.fetchVersions(refAndCache),
maxConcurrentOperations: 10,
pool: pool,
);
}
24 changes: 24 additions & 0 deletions lib/src/system_cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,30 @@ Consider setting the `PUB_CACHE` variable manually.
return pubspec;
}

/// Prefetches the status and advisories for [packages] in parallel.
Future<void> prefetchAdvisoriesAndStatus(Iterable<PackageId> packages) async {
final futures = <Future<Object?>>[];
for (final id in packages) {
futures.add(
id.source.status(
id.toRef(),
id.version,
this,
maxAge: const Duration(days: 3),
),
);
final advisoriesFuture = id.source.getAdvisoriesForPackageVersion(
id,
this,
const Duration(days: 3),
);
if (advisoriesFuture != null) {
futures.add(advisoriesFuture);
}
}
await Future.wait(futures);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have concurrency limits here? probably that exists in fetch and schedule logic, just wanted to check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good point. Shared the pool with the ratelimitedscheduler

}

/// Get the IDs of all versions that match [ref].
///
/// Note that this does *not* require the packages to be downloaded locally,
Expand Down
41 changes: 41 additions & 0 deletions test/get/hosted/advisory_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,45 @@ Future<void> main() async {
File(p.join(d.sandbox, appPath, 'pubspec.lock')).deleteSync();
await pubGet(args: ['--offline']);
});

test('subsequent pub get reads advisories and status from cache on a new '
'resolution without network calls for advisories', () async {
final server = await servePackages();
server.serve('foo', '1.2.3');

await d.dir(appPath, [
d.pubspec({
'name': 'app',
'dependencies': {'foo': '^1.0.0'},
}),
]).create();

server.addAdvisory(
advisoryId: '123',
displayUrl: 'https://github.com/advisories/123',
affectedPackages: [
AffectedPackage(name: 'foo', versions: ['1.2.3']),
],
);

// First fetch: populates the cache
await pubGet(output: contains('affected by advisory'));

// Mock both endpoints to return 500 Internal Server Error.
// If the client tries to hit the network for status or advisories,
// it will fail and not report the advisory.
server.handle(
'/api/packages/foo/advisories',
(request) => Response.internalServerError(),
);
server.handle(
'/api/packages/foo',
(request) => Response.internalServerError(),
);

// Second fetch: the resolution is up to date, and therefore reused.
// It should read advisories from disk cache and print
// the advisory warning, making zero network requests.
await pubGet(output: contains('affected by advisory'));
});
}
15 changes: 8 additions & 7 deletions test/rate_limited_scheduler_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import 'dart:async';

import 'package:pool/pool.dart';
import 'package:pub/src/rate_limited_scheduler.dart';
import 'package:test/test.dart';

Expand All @@ -24,7 +25,7 @@ void main() {
return i.toUpperCase();
}

final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
final scheduler = RateLimitedScheduler(f, pool: Pool(2));
await scheduler.withPrescheduling((preschedule) async {
preschedule('a');
preschedule('b');
Expand Down Expand Up @@ -53,7 +54,7 @@ void main() {
return i.toUpperCase();
}

final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
final scheduler = RateLimitedScheduler(f, pool: Pool(1));

await scheduler.withPrescheduling((preschedule1) async {
await scheduler.withPrescheduling((preschedule2) async {
Expand Down Expand Up @@ -88,7 +89,7 @@ void main() {
return i.toUpperCase();
}

final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
final scheduler = RateLimitedScheduler(f, pool: Pool(1));

Future? b;
await scheduler.withPrescheduling((preschedule) async {
Expand Down Expand Up @@ -118,7 +119,7 @@ void main() {
return i.toUpperCase();
}

final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
final scheduler = RateLimitedScheduler(f, pool: Pool(2));

completers['a']!.complete();
expect(await scheduler.schedule('a'), 'A');
Expand All @@ -136,7 +137,7 @@ void main() {
return i.toUpperCase();
}

final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
final scheduler = RateLimitedScheduler(f, pool: Pool(1));
await scheduler.withPrescheduling((preschedule) async {
preschedule('a');
preschedule('b');
Expand All @@ -161,7 +162,7 @@ void main() {
return i.toUpperCase();
}

final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
final scheduler = RateLimitedScheduler(f, pool: Pool(2));

await scheduler.withPrescheduling((preschedule) async {
preschedule('a');
Expand Down Expand Up @@ -192,7 +193,7 @@ void main() {
return Zone.current['zoneValue'] as String?;
}

final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
final scheduler = RateLimitedScheduler(f, pool: Pool(2));
await scheduler.withPrescheduling((preschedule) async {
runZoned(() {
preschedule('a');
Expand Down
Loading