Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 165 additions & 39 deletions Appwrite/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Appwrite.Converters;
using Appwrite.Extensions;
Expand All @@ -26,6 +27,7 @@ public class Client
private string _endpoint;

private static readonly int ChunkSize = 5 * 1024 * 1024;
private static readonly int MaxConcurrentUploads = 8;

public static JsonSerializerOptions DeserializerOptions { get; set; } = new JsonSerializerOptions
{
Expand Down Expand Up @@ -267,14 +269,14 @@ private HttpRequestMessage PrepareRequest(
{
if (header.Key.Equals("content-type", StringComparison.OrdinalIgnoreCase))
{
_http.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(header.Value));
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(header.Value));
}
else
{
if (_http.DefaultRequestHeaders.Contains(header.Key)) {
_http.DefaultRequestHeaders.Remove(header.Key);
if (request.Headers.Contains(header.Key)) {
request.Headers.Remove(header.Key);
}
_http.DefaultRequestHeaders.Add(header.Key, header.Value);
request.Headers.Add(header.Key, header.Value);
}
}

Expand Down Expand Up @@ -509,87 +511,211 @@ public async Task<T> ChunkedUpload<T>(
);
}

var uploadId = string.Empty;

if (!string.IsNullOrEmpty(idParamName))
{
try
{
// Make a request to check if a file already exists
var current = await Call<Dictionary<string, object?>>(
method: "GET",
path: $"{path}/{parameters[idParamName!]}",
new Dictionary<string, string> { { "content-type", "application/json" } },
parameters: new Dictionary<string, object?>()
);
if (current.TryGetValue("chunksUploaded", out var chunksUploadedValue) && chunksUploadedValue != null)
{
offset = Convert.ToInt64(chunksUploadedValue) * ChunkSize;
// Make a request to check if a file already exists
var current = await Call<Dictionary<string, object?>>(
method: "GET",
path: $"{path}/{parameters[idParamName!]}",
new Dictionary<string, string> { { "content-type", "application/json" } },
parameters: new Dictionary<string, object?>()
);
if (current.TryGetValue("chunksUploaded", out var chunksUploadedValue) && chunksUploadedValue != null)
{
offset = Convert.ToInt64(chunksUploadedValue) * ChunkSize;
}
result = current;
uploadId = parameters[idParamName!]?.ToString() ?? string.Empty;
}
}
catch
{
// ignored as it mostly means file not found
}
}

while (offset < size)
var readLock = new object();

async Task<byte[]> ReadChunkAsync(long start, long end)
{
var length = (int)(end - start);
var chunk = new byte[length];

switch(input.SourceType)
{
case "path":
using (var chunkStream = System.IO.File.OpenRead(input.Path))
Comment on lines +540 to +550

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Resource leak for large path-based uploads — the FileStream opened into input.Data at line 459 (info.OpenRead()) is never used or disposed for files larger than ChunkSize. The new concurrent path's ReadChunkAsync opens its own independent FileStream per chunk, completely bypassing input.Data, so the original stream is abandoned for the entire lifetime of the upload. On Windows this will hold a file lock; on any platform it leaks a file descriptor until GC runs a finalizer.

Suggested change
var readLock = new object();
async Task<byte[]> ReadChunkAsync(long start, long end)
{
var length = (int)(end - start);
var chunk = new byte[length];
switch(input.SourceType)
{
case "path":
using (var chunkStream = System.IO.File.OpenRead(input.Path))
// For the "path" case, input.Data was opened to determine size but is
// not used by ReadChunkAsync (which opens fresh handles per chunk).
// Dispose it now to avoid leaking the file descriptor.
if (input.SourceType == "path" && input.Data is IDisposable pathStream)
{
pathStream.Dispose();
input.Data = null;
}
var readLock = new object();
async Task<byte[]> ReadChunkAsync(long start, long end)
{
var length = (int)(end - start);
var chunk = new byte[length];
switch(input.SourceType)
{
case "path":
using (var chunkStream = System.IO.File.OpenRead(input.Path))

{
chunkStream.Seek(start, SeekOrigin.Begin);
var read = 0;
while (read < length)
{
var count = await chunkStream.ReadAsync(chunk, read, length - read);
if (count == 0)
break;
read += count;
}
}
break;
case "stream":
var stream = input.Data as Stream;
if (stream == null)
throw new InvalidOperationException("Stream data is null");
stream.Seek(offset, SeekOrigin.Begin);
await stream.ReadAsync(buffer, 0, ChunkSize);
lock (readLock)
{
stream.Seek(start, SeekOrigin.Begin);
var read = 0;
while (read < length)
{
var count = stream.Read(chunk, read, length - read);
if (count == 0)
break;
read += count;
}
}
break;
Comment on lines 563 to 579

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 lock used inside an async method to serialize stream reads is problematic. Holding a Monitor lock across synchronous stream.Read calls on potentially slow I/O can starve the thread pool. SemaphoreSlim with WaitAsync is the idiomatic async-safe alternative.

Suggested change
case "stream":
var stream = input.Data as Stream;
if (stream == null)
throw new InvalidOperationException("Stream data is null");
stream.Seek(offset, SeekOrigin.Begin);
await stream.ReadAsync(buffer, 0, ChunkSize);
lock (readLock)
{
stream.Seek(start, SeekOrigin.Begin);
var read = 0;
while (read < length)
{
var count = stream.Read(chunk, read, length - read);
if (count == 0)
break;
read += count;
}
}
break;
case "stream":
var stream = input.Data as Stream;
if (stream == null)
throw new InvalidOperationException("Stream data is null");
await streamSemaphore.WaitAsync();
try
{
stream.Seek(start, SeekOrigin.Begin);
var read = 0;
while (read < length)
{
var count = await stream.ReadAsync(chunk, read, length - read);
if (count == 0)
break;
read += count;
}
}
finally
{
streamSemaphore.Release();
}
break;

case "bytes":
buffer = ((byte[])input.Data)
.Skip((int)offset)
.Take((int)Math.Min(size - offset, ChunkSize - 1))
.ToArray();
Buffer.BlockCopy((byte[])input.Data, (int)start, chunk, 0, length);
break;
}

var content = new MultipartFormDataContent {
{ new ByteArrayContent(buffer), paramName, input.Filename }
return chunk;
}

async Task<Dictionary<string, object?>> UploadChunkAsync(int index, long start, long end, bool includeUploadId)
{
var chunkHeaders = new Dictionary<string, string>(headers)
{
["Content-Range"] = $"bytes {start}-{end - 1}/{size}"
};

parameters[paramName] = content;
if (includeUploadId && !string.IsNullOrEmpty(uploadId))
{
chunkHeaders["x-appwrite-id"] = uploadId;
}

var content = new MultipartFormDataContent {
{ new ByteArrayContent(await ReadChunkAsync(start, end)), paramName, input.Filename }
};

headers["Content-Range"] =
$"bytes {offset}-{Math.Min(offset + ChunkSize - 1, size - 1)}/{size}";
var chunkParameters = new Dictionary<string, object?>(parameters)
{
[paramName] = content
};

result = await Call<Dictionary<string, object?>>(
var chunkResult = await Call<Dictionary<string, object?>>(
method: "POST",
path,
headers,
parameters
chunkHeaders,
chunkParameters
);

offset += ChunkSize;
if (index == 0 || string.IsNullOrEmpty(uploadId))
{
uploadId = chunkResult.ContainsKey("$id")
? chunkResult["$id"]?.ToString() ?? string.Empty
: string.Empty;
}
Comment on lines +616 to +621

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unsynchronised write to uploadId across concurrent workers — the check-then-write on string.IsNullOrEmpty(uploadId) and the subsequent assignment are not protected by any lock. If uploadId is still empty when the concurrent phase starts (e.g. idParamName was absent or the GET check failed), multiple workers can race through the condition simultaneously. Any worker that reads uploadId between another worker's condition check and its assignment will pass an empty x-appwrite-id header, causing the server to treat that chunk as a new independent upload rather than part of the same file.


return chunkResult;
}

if (offset == 0)
{
var firstChunkEnd = Math.Min(ChunkSize, size);
result = await UploadChunkAsync(0, 0, firstChunkEnd, false);
offset = firstChunkEnd;

var id = result.ContainsKey("$id")
? result["$id"]?.ToString() ?? string.Empty
: string.Empty;
var chunksTotal = result.TryGetValue("chunksTotal", out var chunksTotalValue) && chunksTotalValue != null
? Convert.ToInt64(chunksTotalValue)
: 0L;
: (long)Math.Ceiling(size / (double)ChunkSize);
var chunksUploaded = result.TryGetValue("chunksUploaded", out var chunksUploadedValue) && chunksUploadedValue != null
? Convert.ToInt64(chunksUploadedValue)
: 0L;

headers["x-appwrite-id"] = id;

onProgress?.Invoke(
new UploadProgress(
id: id,
progress: Math.Min(offset, size) / size * 100,
id: uploadId,
progress: Math.Min(offset, size) / (double)size * 100,
sizeUploaded: Math.Min(offset, size),
chunksTotal: chunksTotal,
chunksUploaded: chunksUploaded));
}

var chunks = new List<(int Index, long Start, long End)>();
var chunkOffset = offset;
var totalChunks = (long)Math.Ceiling(size / (double)ChunkSize);
while (chunkOffset < size)
{
var end = Math.Min(chunkOffset + ChunkSize, size);
chunks.Add(((int)(chunkOffset / ChunkSize), chunkOffset, end));
chunkOffset = end;
}

if (chunks.Count > 0)
{
var nextChunk = -1;
var completedChunks = (long)(offset / ChunkSize);
var uploadedBytes = offset;
var resultLock = new object();

bool IsUploadComplete(Dictionary<string, object?> chunkResult)
{
if (!chunkResult.TryGetValue("chunksUploaded", out var chunksUploadedValue) || chunksUploadedValue == null)
{
return false;
}

var chunksUploaded = Convert.ToInt64(chunksUploadedValue);
var chunksTotal = chunkResult.TryGetValue("chunksTotal", out var chunksTotalValue) && chunksTotalValue != null
? Convert.ToInt64(chunksTotalValue)
: totalChunks;

return chunksUploaded >= chunksTotal;
}

var workers = Enumerable.Range(0, Math.Min(MaxConcurrentUploads, chunks.Count))
.Select(async _ =>
{
while (true)
{
var chunkIndex = Interlocked.Increment(ref nextChunk);
if (chunkIndex >= chunks.Count)
break;

var chunk = chunks[chunkIndex];
var chunkResult = await UploadChunkAsync(chunk.Index, chunk.Start, chunk.End, true);

if (IsUploadComplete(chunkResult))
{
lock (resultLock)
{
result = chunkResult;
}
}

var chunksUploaded = Interlocked.Increment(ref completedChunks);
var sizeUploaded = Interlocked.Add(ref uploadedBytes, chunk.End - chunk.Start);
var chunksTotal = chunkResult.TryGetValue("chunksTotal", out var chunksTotalValue) && chunksTotalValue != null
? Convert.ToInt64(chunksTotalValue)
: totalChunks;

onProgress?.Invoke(
new UploadProgress(
id: uploadId,
progress: Math.Min(sizeUploaded, size) / (double)size * 100,
sizeUploaded: Math.Min(sizeUploaded, size),
chunksTotal: chunksTotal,
chunksUploaded: chunksUploaded));
}
});

await Task.WhenAll(workers);
}

// Convert to non-nullable dictionary for converter
var nonNullableResult = result.Where(kvp => kvp.Value != null)
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value!);
Expand Down
3 changes: 0 additions & 3 deletions Appwrite/Enums/BuildRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ public BuildRuntime(string value)
public static BuildRuntime PythonMl311 => new BuildRuntime("python-ml-3.11");
public static BuildRuntime PythonMl312 => new BuildRuntime("python-ml-3.12");
public static BuildRuntime PythonMl313 => new BuildRuntime("python-ml-3.13");
public static BuildRuntime Deno121 => new BuildRuntime("deno-1.21");
public static BuildRuntime Deno124 => new BuildRuntime("deno-1.24");
public static BuildRuntime Deno135 => new BuildRuntime("deno-1.35");
public static BuildRuntime Deno140 => new BuildRuntime("deno-1.40");
public static BuildRuntime Deno146 => new BuildRuntime("deno-1.46");
public static BuildRuntime Deno20 => new BuildRuntime("deno-2.0");
Expand Down
3 changes: 0 additions & 3 deletions Appwrite/Enums/Runtime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ public Runtime(string value)
public static Runtime PythonMl311 => new Runtime("python-ml-3.11");
public static Runtime PythonMl312 => new Runtime("python-ml-3.12");
public static Runtime PythonMl313 => new Runtime("python-ml-3.13");
public static Runtime Deno121 => new Runtime("deno-1.21");
public static Runtime Deno124 => new Runtime("deno-1.24");
public static Runtime Deno135 => new Runtime("deno-1.35");
public static Runtime Deno140 => new Runtime("deno-1.40");
public static Runtime Deno146 => new Runtime("deno-1.46");
public static Runtime Deno20 => new Runtime("deno-2.0");
Expand Down
Loading
Loading