feat: support concurrent chunk uploads#94
Conversation
Greptile SummaryThis PR refactors the chunked upload path in
Confidence Score: 3/5The concurrent upload logic introduces a genuine file-handle leak that will affect any caller uploading a large file by path, and a shared-variable write that is not guarded by a lock. For large path-based uploads, info.OpenRead() creates a FileStream stored in input.Data that is immediately orphaned — ReadChunkAsync opens independent handles per chunk and never touches input.Data. The leaked handle persists until GC runs a finalizer, holding a file lock on Windows. Separately, uploadId is written inside UploadChunkAsync without synchronisation; a narrow window where it is still empty when concurrent workers start allows multiple workers to race the assignment, potentially sending chunks without the correct ID. Appwrite/Client.cs — the concurrent upload path around ReadChunkAsync and UploadChunkAsync. Important Files Changed
Reviews (1): Last reviewed commit: "feat: support concurrent chunk uploads" | Re-trigger Greptile |
| var readLock = new object(); | ||
|
|
||
| async Task<byte[]> ReadChunkAsync(long start, long end) | ||
| { | ||
| var length = (int)(end - start); | ||
| var chunk = new byte[length]; | ||
|
|
||
| switch(input.SourceType) | ||
| { | ||
| case "path": | ||
| using (var chunkStream = System.IO.File.OpenRead(input.Path)) |
There was a problem hiding this comment.
Resource leak for large path-based uploads — the
FileStream opened into input.Data at line 459 (info.OpenRead()) is never used or disposed for files larger than ChunkSize. The new concurrent path's ReadChunkAsync opens its own independent FileStream per chunk, completely bypassing input.Data, so the original stream is abandoned for the entire lifetime of the upload. On Windows this will hold a file lock; on any platform it leaks a file descriptor until GC runs a finalizer.
| var readLock = new object(); | |
| async Task<byte[]> ReadChunkAsync(long start, long end) | |
| { | |
| var length = (int)(end - start); | |
| var chunk = new byte[length]; | |
| switch(input.SourceType) | |
| { | |
| case "path": | |
| using (var chunkStream = System.IO.File.OpenRead(input.Path)) | |
| // For the "path" case, input.Data was opened to determine size but is | |
| // not used by ReadChunkAsync (which opens fresh handles per chunk). | |
| // Dispose it now to avoid leaking the file descriptor. | |
| if (input.SourceType == "path" && input.Data is IDisposable pathStream) | |
| { | |
| pathStream.Dispose(); | |
| input.Data = null; | |
| } | |
| var readLock = new object(); | |
| async Task<byte[]> ReadChunkAsync(long start, long end) | |
| { | |
| var length = (int)(end - start); | |
| var chunk = new byte[length]; | |
| switch(input.SourceType) | |
| { | |
| case "path": | |
| using (var chunkStream = System.IO.File.OpenRead(input.Path)) |
| if (index == 0 || string.IsNullOrEmpty(uploadId)) | ||
| { | ||
| uploadId = chunkResult.ContainsKey("$id") | ||
| ? chunkResult["$id"]?.ToString() ?? string.Empty | ||
| : string.Empty; | ||
| } |
There was a problem hiding this comment.
Unsynchronised write to
uploadId across concurrent workers — the check-then-write on string.IsNullOrEmpty(uploadId) and the subsequent assignment are not protected by any lock. If uploadId is still empty when the concurrent phase starts (e.g. idParamName was absent or the GET check failed), multiple workers can race through the condition simultaneously. Any worker that reads uploadId between another worker's condition check and its assignment will pass an empty x-appwrite-id header, causing the server to treat that chunk as a new independent upload rather than part of the same file.
| case "stream": | ||
| var stream = input.Data as Stream; | ||
| if (stream == null) | ||
| throw new InvalidOperationException("Stream data is null"); | ||
| stream.Seek(offset, SeekOrigin.Begin); | ||
| await stream.ReadAsync(buffer, 0, ChunkSize); | ||
| lock (readLock) | ||
| { | ||
| stream.Seek(start, SeekOrigin.Begin); | ||
| var read = 0; | ||
| while (read < length) | ||
| { | ||
| var count = stream.Read(chunk, read, length - read); | ||
| if (count == 0) | ||
| break; | ||
| read += count; | ||
| } | ||
| } | ||
| break; |
There was a problem hiding this comment.
lock used inside an async method to serialize stream reads is problematic. Holding a Monitor lock across synchronous stream.Read calls on potentially slow I/O can starve the thread pool. SemaphoreSlim with WaitAsync is the idiomatic async-safe alternative.
| case "stream": | |
| var stream = input.Data as Stream; | |
| if (stream == null) | |
| throw new InvalidOperationException("Stream data is null"); | |
| stream.Seek(offset, SeekOrigin.Begin); | |
| await stream.ReadAsync(buffer, 0, ChunkSize); | |
| lock (readLock) | |
| { | |
| stream.Seek(start, SeekOrigin.Begin); | |
| var read = 0; | |
| while (read < length) | |
| { | |
| var count = stream.Read(chunk, read, length - read); | |
| if (count == 0) | |
| break; | |
| read += count; | |
| } | |
| } | |
| break; | |
| case "stream": | |
| var stream = input.Data as Stream; | |
| if (stream == null) | |
| throw new InvalidOperationException("Stream data is null"); | |
| await streamSemaphore.WaitAsync(); | |
| try | |
| { | |
| stream.Seek(start, SeekOrigin.Begin); | |
| var read = 0; | |
| while (read < length) | |
| { | |
| var count = await stream.ReadAsync(chunk, read, length - read); | |
| if (count == 0) | |
| break; | |
| read += count; | |
| } | |
| } | |
| finally | |
| { | |
| streamSemaphore.Release(); | |
| } | |
| break; |
|
Closing in favor of focused PR #95. |
This PR updates the SDK to support concurrent chunk uploads.