Skip to content

Commit 96a4481

Browse files
Tyrie VellaCopilot
andcommitted
Coalesce concurrent download requests for the same object
When multiple git.exe processes request the same loose object simultaneously via named pipes, GVFS previously started independent HTTP downloads for each request. Under memory pressure, this caused 48K+ redundant requests, draining the connection pool and effectively hanging the mount process. Add a ConcurrentDictionary<string, Lazy<DownloadAndSaveObjectResult>> to GVFSGitObjects that ensures only one download runs per objectId at a time. Concurrent callers for the same SHA share the result of the single in-flight download. The entry is removed after completion so subsequent requests get fresh download attempts. Work item: 60167591 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 44b6d6b commit 96a4481

2 files changed

Lines changed: 254 additions & 1 deletion

File tree

GVFS/GVFS.Common/Git/GVFSGitObjects.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ public class GVFSGitObjects : GitObjects
1414
private static readonly TimeSpan NegativeCacheTTL = TimeSpan.FromSeconds(30);
1515

1616
private ConcurrentDictionary<string, DateTime> objectNegativeCache;
17+
private ConcurrentDictionary<string, Lazy<DownloadAndSaveObjectResult>> inflightDownloads;
1718

1819
public GVFSGitObjects(GVFSContext context, GitObjectsHttpRequestor objectRequestor)
1920
: base(context.Tracer, context.Enlistment, objectRequestor, context.FileSystem)
2021
{
2122
this.Context = context;
2223
this.objectNegativeCache = new ConcurrentDictionary<string, DateTime>(StringComparer.OrdinalIgnoreCase);
24+
this.inflightDownloads = new ConcurrentDictionary<string, Lazy<DownloadAndSaveObjectResult>>(StringComparer.OrdinalIgnoreCase);
2325
}
2426

2527
public enum RequestSource
@@ -127,6 +129,40 @@ private DownloadAndSaveObjectResult TryDownloadAndSaveObject(
127129
this.objectNegativeCache.TryRemove(objectId, out negativeCacheRequestTime);
128130
}
129131

132+
// Coalesce concurrent requests for the same objectId so that only one HTTP
133+
// download runs per SHA at a time. All concurrent callers share the result.
134+
// Note: the first caller's cancellationToken and retryOnFailure settings are
135+
// captured by the Lazy factory. Subsequent coalesced callers inherit those
136+
// settings. In practice this is fine because the primary concurrent path
137+
// (NamedPipeMessage from git.exe) always uses CancellationToken.None.
138+
Lazy<DownloadAndSaveObjectResult> newLazy = new Lazy<DownloadAndSaveObjectResult>(
139+
() => this.DoDownloadAndSaveObject(objectId, cancellationToken, requestSource, retryOnFailure));
140+
Lazy<DownloadAndSaveObjectResult> lazy = this.inflightDownloads.GetOrAdd(objectId, newLazy);
141+
142+
if (!ReferenceEquals(lazy, newLazy))
143+
{
144+
EventMetadata metadata = new EventMetadata();
145+
metadata.Add("objectId", objectId);
146+
metadata.Add("requestSource", requestSource.ToString());
147+
this.Context.Tracer.RelatedEvent(EventLevel.Informational, "TryDownloadAndSaveObject_CoalescedRequest", metadata);
148+
}
149+
150+
try
151+
{
152+
return lazy.Value;
153+
}
154+
finally
155+
{
156+
this.inflightDownloads.TryRemove(objectId, out _);
157+
}
158+
}
159+
160+
private DownloadAndSaveObjectResult DoDownloadAndSaveObject(
161+
string objectId,
162+
CancellationToken cancellationToken,
163+
RequestSource requestSource,
164+
bool retryOnFailure)
165+
{
130166
// To reduce allocations, reuse the same buffer when writing objects in this batch
131167
byte[] bufToCopyWith = new byte[StreamUtil.DefaultCopyBufferSize];
132168

GVFS/GVFS.UnitTests/Git/GVFSGitObjectsTests.cs

Lines changed: 218 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,157 @@ public void FailsNullBytePackDownloads()
118118
gitObjects => gitObjects.TryDownloadCommit("object0"));
119119
}
120120

121+
[TestCase]
122+
public void CoalescesMultipleConcurrentRequestsForSameObject()
123+
{
124+
ManualResetEventSlim downloadStarted = new ManualResetEventSlim(false);
125+
ManualResetEventSlim downloadGate = new ManualResetEventSlim(false);
126+
int downloadCount = 0;
127+
128+
CoalescingTestHttpGitObjects httpObjects = new CoalescingTestHttpGitObjects(
129+
this.validTestObjectFileContents,
130+
onDownloadStarting: () =>
131+
{
132+
Interlocked.Increment(ref downloadCount);
133+
downloadStarted.Set();
134+
downloadGate.Wait();
135+
});
136+
137+
MockFileSystemWithCallbacks fileSystem = new MockFileSystemWithCallbacks();
138+
fileSystem.OnFileExists = (path) => false;
139+
fileSystem.OnMoveFile = (source, target) => { };
140+
fileSystem.OnOpenFileStream = (path, mode, access) =>
141+
{
142+
if (access == FileAccess.Read)
143+
{
144+
return new MemoryStream(this.validTestObjectFileContents);
145+
}
146+
147+
return new MemoryStream();
148+
};
149+
150+
GVFSGitObjects dut = this.CreateTestableGVFSGitObjects(httpObjects, fileSystem);
151+
152+
const int threadCount = 10;
153+
GitObjects.DownloadAndSaveObjectResult[] results = new GitObjects.DownloadAndSaveObjectResult[threadCount];
154+
Thread[] threads = new Thread[threadCount];
155+
CountdownEvent allReady = new CountdownEvent(threadCount);
156+
ManualResetEventSlim go = new ManualResetEventSlim(false);
157+
158+
for (int i = 0; i < threadCount; i++)
159+
{
160+
int idx = i;
161+
threads[i] = new Thread(() =>
162+
{
163+
allReady.Signal();
164+
go.Wait();
165+
results[idx] = dut.TryDownloadAndSaveObject(
166+
ValidTestObjectFileSha1,
167+
GVFSGitObjects.RequestSource.NamedPipeMessage);
168+
});
169+
threads[i].Start();
170+
}
171+
172+
// Release all threads simultaneously
173+
allReady.Wait();
174+
go.Set();
175+
176+
// Wait for the first download to start (proves one thread entered the factory)
177+
downloadStarted.Wait(TimeSpan.FromSeconds(5)).ShouldBeTrue("Download should have started");
178+
179+
// Give other threads time to pile up on the Lazy<T>
180+
Thread.Sleep(200);
181+
182+
// Release the download
183+
downloadGate.Set();
184+
185+
// Wait for all threads
186+
foreach (Thread t in threads)
187+
{
188+
t.Join(TimeSpan.FromSeconds(10)).ShouldBeTrue("Thread should complete");
189+
}
190+
191+
// Only one download should have occurred
192+
downloadCount.ShouldEqual(1);
193+
194+
// All threads should have gotten Success
195+
foreach (GitObjects.DownloadAndSaveObjectResult result in results)
196+
{
197+
result.ShouldEqual(GitObjects.DownloadAndSaveObjectResult.Success);
198+
}
199+
}
200+
201+
[TestCase]
202+
public void DifferentObjectsAreNotCoalesced()
203+
{
204+
string secondSha = "b376885ac8452b6cbf9ced81b1080bfd570d9b91";
205+
int downloadCount = 0;
206+
207+
CoalescingTestHttpGitObjects httpObjects = new CoalescingTestHttpGitObjects(
208+
this.validTestObjectFileContents,
209+
onDownloadStarting: () => Interlocked.Increment(ref downloadCount));
210+
211+
MockFileSystemWithCallbacks fileSystem = new MockFileSystemWithCallbacks();
212+
fileSystem.OnFileExists = (path) => false;
213+
fileSystem.OnMoveFile = (source, target) => { };
214+
fileSystem.OnOpenFileStream = (path, mode, access) =>
215+
{
216+
if (access == FileAccess.Read)
217+
{
218+
return new MemoryStream(this.validTestObjectFileContents);
219+
}
220+
221+
return new MemoryStream();
222+
};
223+
224+
GVFSGitObjects dut = this.CreateTestableGVFSGitObjects(httpObjects, fileSystem);
225+
226+
dut.TryDownloadAndSaveObject(ValidTestObjectFileSha1, GVFSGitObjects.RequestSource.NamedPipeMessage)
227+
.ShouldEqual(GitObjects.DownloadAndSaveObjectResult.Success);
228+
229+
dut.TryDownloadAndSaveObject(secondSha, GVFSGitObjects.RequestSource.NamedPipeMessage)
230+
.ShouldEqual(GitObjects.DownloadAndSaveObjectResult.Success);
231+
232+
downloadCount.ShouldEqual(2);
233+
}
234+
235+
[TestCase]
236+
public void FailedDownloadAllowsSubsequentRetry()
237+
{
238+
int downloadCount = 0;
239+
240+
CoalescingTestHttpGitObjects httpObjects = new CoalescingTestHttpGitObjects(
241+
this.validTestObjectFileContents,
242+
onDownloadStarting: () => Interlocked.Increment(ref downloadCount),
243+
failUntilAttempt: 2);
244+
245+
MockFileSystemWithCallbacks fileSystem = new MockFileSystemWithCallbacks();
246+
fileSystem.OnFileExists = (path) => false;
247+
fileSystem.OnMoveFile = (source, target) => { };
248+
fileSystem.OnOpenFileStream = (path, mode, access) =>
249+
{
250+
if (access == FileAccess.Read)
251+
{
252+
return new MemoryStream(this.validTestObjectFileContents);
253+
}
254+
255+
return new MemoryStream();
256+
};
257+
258+
GVFSGitObjects dut = this.CreateTestableGVFSGitObjects(httpObjects, fileSystem);
259+
260+
// First attempt fails
261+
dut.TryDownloadAndSaveObject(ValidTestObjectFileSha1, GVFSGitObjects.RequestSource.NamedPipeMessage)
262+
.ShouldEqual(GitObjects.DownloadAndSaveObjectResult.Error);
263+
264+
// Second attempt should start a new download (not reuse cached failure)
265+
dut.TryDownloadAndSaveObject(ValidTestObjectFileSha1, GVFSGitObjects.RequestSource.NamedPipeMessage)
266+
.ShouldEqual(GitObjects.DownloadAndSaveObjectResult.Success);
267+
268+
// Two separate downloads should have occurred
269+
downloadCount.ShouldEqual(2);
270+
}
271+
121272
private void AssertRetryableExceptionOnDownload(
122273
MemoryStream inputStream,
123274
string mediaType,
@@ -140,7 +291,7 @@ private void AssertRetryableExceptionOnDownload(
140291
}
141292
}
142293

143-
private GVFSGitObjects CreateTestableGVFSGitObjects(MockHttpGitObjects httpObjects, MockFileSystemWithCallbacks fileSystem)
294+
private GVFSGitObjects CreateTestableGVFSGitObjects(GitObjectsHttpRequestor httpObjects, MockFileSystemWithCallbacks fileSystem)
144295
{
145296
MockTracer tracer = new MockTracer();
146297
GVFSEnlistment enlistment = new GVFSEnlistment(TestEnlistmentRoot, "https://fakeRepoUrl", "fakeGitBinPath", authentication: null);
@@ -224,5 +375,71 @@ public UnsafeGVFSGitObjects(GVFSContext context, GitObjectsHttpRequestor objectR
224375
this.checkData = false;
225376
}
226377
}
378+
379+
private class CoalescingTestHttpGitObjects : GitObjectsHttpRequestor
380+
{
381+
private readonly byte[] objectContents;
382+
private readonly Action onDownloadStarting;
383+
private readonly int failUntilAttempt;
384+
private int attemptCount;
385+
386+
public CoalescingTestHttpGitObjects(byte[] objectContents, Action onDownloadStarting, int failUntilAttempt = 0)
387+
: this(new MockGVFSEnlistment(), objectContents, onDownloadStarting, failUntilAttempt)
388+
{
389+
}
390+
391+
private CoalescingTestHttpGitObjects(MockGVFSEnlistment enlistment, byte[] objectContents, Action onDownloadStarting, int failUntilAttempt)
392+
: base(new MockTracer(), enlistment, new MockCacheServerInfo(), new RetryConfig(maxRetries: 1))
393+
{
394+
this.objectContents = objectContents;
395+
this.onDownloadStarting = onDownloadStarting;
396+
this.failUntilAttempt = failUntilAttempt;
397+
}
398+
399+
public override RetryWrapper<GitObjectTaskResult>.InvocationResult TryDownloadLooseObject(
400+
string objectId,
401+
bool retryOnFailure,
402+
CancellationToken cancellationToken,
403+
string requestSource,
404+
Func<int, GitEndPointResponseData, RetryWrapper<GitObjectTaskResult>.CallbackResult> onSuccess)
405+
{
406+
this.onDownloadStarting?.Invoke();
407+
408+
int attempt = Interlocked.Increment(ref this.attemptCount);
409+
if (attempt < this.failUntilAttempt)
410+
{
411+
GitObjectTaskResult failResult = new GitObjectTaskResult(false);
412+
return new RetryWrapper<GitObjectTaskResult>.InvocationResult(0, false, failResult);
413+
}
414+
415+
using (MemoryStream stream = new MemoryStream(this.objectContents))
416+
using (GitEndPointResponseData response = new GitEndPointResponseData(
417+
HttpStatusCode.OK,
418+
GVFSConstants.MediaTypes.LooseObjectMediaType,
419+
stream,
420+
message: null,
421+
onResponseDisposed: null))
422+
{
423+
onSuccess(0, response);
424+
}
425+
426+
GitObjectTaskResult result = new GitObjectTaskResult(true);
427+
return new RetryWrapper<GitObjectTaskResult>.InvocationResult(0, true, result);
428+
}
429+
430+
public override RetryWrapper<GitObjectTaskResult>.InvocationResult TryDownloadObjects(
431+
IEnumerable<string> objectIds,
432+
Func<int, GitEndPointResponseData, RetryWrapper<GitObjectTaskResult>.CallbackResult> onSuccess,
433+
Action<RetryWrapper<GitObjectTaskResult>.ErrorEventArgs> onFailure,
434+
bool preferBatchedLooseObjects)
435+
{
436+
throw new NotImplementedException();
437+
}
438+
439+
public override List<GitObjectSize> QueryForFileSizes(IEnumerable<string> objectIds, CancellationToken cancellationToken)
440+
{
441+
throw new NotImplementedException();
442+
}
443+
}
227444
}
228445
}

0 commit comments

Comments
 (0)