Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Consume body without allocating or reading #335

Merged
merged 6 commits into from
Nov 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public override void Flush()

private Task<int> ReadAsync(ArraySegment<byte> buffer)
{
return _input.ReadAsync(buffer);
return _input.ReadAsync(buffer.Array, buffer.Offset, buffer.Count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth just overloading this on SocketInput? Notice a bunch of other methods here create an array segment to call this method.

At least least, you could clean all of those up to avoid creating the ArraySegment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't create an ArraySegement with a null buffer + specified length or it will throw an exception.

However a consumption limit needs to be passed through to CopyTo else it will consume the whole input rather than just the input related to the current request. Is a bit of abstraction leakiness to make the code reusable; rather than copy paste repeated...

}
}
}
6 changes: 2 additions & 4 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,8 @@ public async Task RequestProcessingAsync()

await ProduceEnd();

while (await RequestBody.ReadAsync(_nullBuffer, 0, _nullBuffer.Length) != 0)
{
// Finish reading the request body in case the app did not.
}
// Finish reading the request body in case the app did not.
await MessageBody.Consume();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💃

}

terminated = !_keepAlive;
Expand Down
40 changes: 35 additions & 5 deletions src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,37 @@ protected MessageBody(FrameContext context)
return result;
}

public async Task Consume(CancellationToken cancellationToken = default(CancellationToken))
{
Task<int> result;
var send100checked = false;
do
{
result = ReadAsyncImplementation(default(ArraySegment<byte>), cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not await here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was copying the function above. Following the logic of that function; the task hasn't completed because no data; check whether their is no data because the client is waiting for a 100 continue else the await will block infinitely as it waiting for data that's dependent on a send; if so do the send and return the incomplete task?

Since we are doing the full loop here; if we pass over that condition; we know the task has already completed. If the result is 0 then return as everything is done; otherwise move to next loop, no point in checking twice with the await for a condition we already know. Therefore we only await if the task is not complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also Interlocked is much more expensive than a local var and only one 100 continue is sent, so only check it once, as we know the results for every loop after that. Though firstLoop is probably the wrong name.

if (!result.IsCompleted)
{
if (!send100checked)
{
if (Interlocked.Exchange(ref _send100Continue, 0) == 1)
{
_context.FrameControl.ProduceContinue();
}
send100checked = true;
}
}
else if (result.GetAwaiter().GetResult() == 0)
{
// Completed Task, end of stream
return;
}
else
{
// Completed Task, get next Task rather than await
continue;
}
} while (await result != 0);
}

public abstract Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken);

public static MessageBody For(
Expand Down Expand Up @@ -108,7 +139,7 @@ public ForRemainingData(FrameContext context)

public override Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
return _context.SocketInput.ReadAsync(buffer);
return _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, buffer.Array == null ? 8192 : buffer.Count);
}
}

Expand All @@ -129,14 +160,13 @@ public override async Task<int> ReadAsyncImplementation(ArraySegment<byte> buffe
{
var input = _context.SocketInput;

var limit = Math.Min(buffer.Count, _inputLength);
var limit = buffer.Array == null ? _inputLength : Math.Min(buffer.Count, _inputLength);
if (limit == 0)
{
return 0;
}

var limitedBuffer = new ArraySegment<byte>(buffer.Array, buffer.Offset, limit);
var actual = await _context.SocketInput.ReadAsync(limitedBuffer);
var actual = await _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, limit);
_inputLength -= actual;

if (actual == 0)
Expand Down Expand Up @@ -189,7 +219,7 @@ public override async Task<int> ReadAsyncImplementation(ArraySegment<byte> buffe
}
while (_mode == Mode.ChunkData)
{
var limit = Math.Min(buffer.Count, _inputLength);
var limit = buffer.Array == null ? _inputLength : Math.Min(buffer.Count, _inputLength);
if (limit != 0)
{
await input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
public static class SocketInputExtensions
{
public static async Task<int> ReadAsync(this SocketInput input, ArraySegment<byte> buffer)
public static async Task<int> ReadAsync(this SocketInput input, byte[] buffer, int offset, int count)
{
while (true)
{
await input;

var begin = input.ConsumingStart();
int actual;
var end = begin.CopyTo(buffer.Array, buffer.Offset, buffer.Count, out actual);
var end = begin.CopyTo(buffer, offset, count, out actual);
input.ConsumingComplete(end, end);

if (actual != 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,18 +600,27 @@ public MemoryPoolIterator2 CopyTo(byte[] array, int offset, int count, out int a
if (remaining <= following)
{
actual = count;
Buffer.BlockCopy(block.Array, index, array, offset, remaining);
if (array != null)
{
Buffer.BlockCopy(block.Array, index, array, offset, remaining);
}
return new MemoryPoolIterator2(block, index + remaining);
}
else if (block.Next == null)
{
actual = count - remaining + following;
Buffer.BlockCopy(block.Array, index, array, offset, following);
if (array != null)
{
Buffer.BlockCopy(block.Array, index, array, offset, following);
}
return new MemoryPoolIterator2(block, index + following);
}
else
{
Buffer.BlockCopy(block.Array, index, array, offset, following);
if (array != null)
{
Buffer.BlockCopy(block.Array, index, array, offset, following);
}
offset += following;
remaining -= following;
block = block.Next;
Expand Down