Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

[Wip] Differentiate write and write2 + simplify #526

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ abstract public class ListenerPrimary : Listener

// this message is passed to write2 because it must be non-zero-length,
// but it has no other functional significance
private readonly ArraySegment<ArraySegment<byte>> _dummyMessage = new ArraySegment<ArraySegment<byte>>(new[] { new ArraySegment<byte>(new byte[] { 1, 2, 3, 4 }) });
private readonly ArraySegment<byte> _dummyMessage = new ArraySegment<byte>(new byte[] { 1, 2, 3, 4 });

protected ListenerPrimary(ServiceContext serviceContext) : base(serviceContext)
{
Expand Down Expand Up @@ -86,7 +86,7 @@ protected override void DispatchConnection(UvStreamHandle socket)
else
{
var dispatchPipe = _dispatchPipes[index];
var write = new UvWriteReq(Log);
var write = new UvWriteReq2(Log);
write.Init(Thread.Loop);
write.Write2(
dispatchPipe,
Expand Down
70 changes: 11 additions & 59 deletions src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.Extensions.Logging;
Expand All @@ -22,7 +21,9 @@ public class UvWriteReq : UvRequest
private object _state;
private const int BUFFER_COUNT = 4;

private List<GCHandle> _pins = new List<GCHandle>(BUFFER_COUNT + 1);
private GCHandle _pinUvWriteReq;
private GCHandle _pinBufferArray;
private bool _bufferArrayIsPinned;

public UvWriteReq(IKestrelTrace logger) : base(logger)
{
Expand Down Expand Up @@ -50,16 +51,16 @@ public unsafe void Write(
try
{
// add GCHandle to keeps this SafeHandle alive while request processing
_pins.Add(GCHandle.Alloc(this, GCHandleType.Normal));
_pinUvWriteReq = GCHandle.Alloc(this, GCHandleType.Normal);

var pBuffers = (Libuv.uv_buf_t*)_bufs;
if (nBuffers > BUFFER_COUNT)
{
// create and pin buffer array when it's larger than the pre-allocated one
var bufArray = new Libuv.uv_buf_t[nBuffers];
var gcHandle = GCHandle.Alloc(bufArray, GCHandleType.Pinned);
_pins.Add(gcHandle);
pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject();
_pinBufferArray = GCHandle.Alloc(bufArray, GCHandleType.Pinned);
_bufferArrayIsPinned = true;
pBuffers = (Libuv.uv_buf_t*)_pinBufferArray.AddrOfPinnedObject();
}

var block = start.Block;
Expand All @@ -68,7 +69,6 @@ public unsafe void Write(
var blockStart = block == start.Block ? start.Index : block.Data.Offset;
var blockEnd = block == end.Block ? end.Index : block.Data.Offset + block.Data.Count;

// create and pin each segment being written
pBuffers[index] = Libuv.buf_init(
block.Pin() + blockStart,
blockEnd - blockStart);
Expand All @@ -89,69 +89,21 @@ public unsafe void Write(
var block = start.Block;
for (var index = 0; index < nBuffers; index++)
{
block.Unpin();
block = block.Next;
}

throw;
}
}

public unsafe void Write2(
UvStreamHandle handle,
ArraySegment<ArraySegment<byte>> bufs,
UvStreamHandle sendHandle,
Action<UvWriteReq, int, Exception, object> callback,
object state)
{
try
{
// add GCHandle to keeps this SafeHandle alive while request processing
_pins.Add(GCHandle.Alloc(this, GCHandleType.Normal));

var pBuffers = (Libuv.uv_buf_t*)_bufs;
var nBuffers = bufs.Count;
if (nBuffers > BUFFER_COUNT)
{
// create and pin buffer array when it's larger than the pre-allocated one
var bufArray = new Libuv.uv_buf_t[nBuffers];
var gcHandle = GCHandle.Alloc(bufArray, GCHandleType.Pinned);
_pins.Add(gcHandle);
pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject();
}

for (var index = 0; index < nBuffers; index++)
{
// create and pin each segment being written
var buf = bufs.Array[bufs.Offset + index];

var gcHandle = GCHandle.Alloc(buf.Array, GCHandleType.Pinned);
_pins.Add(gcHandle);
pBuffers[index] = Libuv.buf_init(
gcHandle.AddrOfPinnedObject() + buf.Offset,
buf.Count);
}

_callback = callback;
_state = state;
_uv.write2(this, handle, pBuffers, nBuffers, sendHandle, _uv_write_cb);
}
catch
{
_callback = null;
_state = null;
Unpin(this);
throw;
}
}

private static void Unpin(UvWriteReq req)
{
foreach (var pin in req._pins)
req._pinUvWriteReq.Free();
if (req._bufferArrayIsPinned)
{
pin.Free();
req._pinBufferArray.Free();
req._bufferArrayIsPinned = false;
}
req._pins.Clear();
}

private static void UvWriteCb(IntPtr ptr, int status)
Expand Down
114 changes: 114 additions & 0 deletions src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Runtime.InteropServices;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNet.Server.Kestrel.Networking
{
/// <summary>
/// Summary description for UvWriteRequest2
/// </summary>
public class UvWriteReq2 : UvRequest
{
private readonly static Libuv.uv_write_cb _uv_write2_cb = (IntPtr ptr, int status) => UvWrite2Cb(ptr, status);

private IntPtr _bufs;

private Action<UvWriteReq2, int, Exception, object> _callback;
private object _state;
private const int BUFFER_COUNT = 1;

private GCHandle _pinUvWrite2Req;
private GCHandle _pinBuffer;
private bool _bufferIsPinned;

public UvWriteReq2(IKestrelTrace logger) : base(logger)
{
}

public void Init(UvLoopHandle loop)
{
var requestSize = loop.Libuv.req_size(Libuv.RequestType.WRITE);
var bufferSize = Marshal.SizeOf<Libuv.uv_buf_t>() * BUFFER_COUNT;
CreateMemory(
loop.Libuv,
loop.ThreadId,
requestSize + bufferSize);
_bufs = handle + requestSize;
}

public unsafe void Write2(
UvStreamHandle handle,
ArraySegment<byte> buf,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to put the dummy buffer in this class and use a static pBuffers object. Better encapsulation probably.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better yet. Create the dummy buffer in this class using CreateMemory. This way the only GCHandle we have to bother with is _pinUvWrite2Req.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made change for this but can't test due to dotnet/aspnetcore#1207

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benaadams Do you still have this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, though will have to add it to PR after Saturday (Playtest this weekend so focused on that)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Good luck with the playtest!

UvStreamHandle sendHandle,
Action<UvWriteReq2, int, Exception, object> callback,
object state)
{
try
{
// add GCHandle to keeps this SafeHandle alive while request processing
_pinUvWrite2Req = GCHandle.Alloc(this, GCHandleType.Normal);

var pBuffers = (Libuv.uv_buf_t*)_bufs;
_pinBuffer = GCHandle.Alloc(buf.Array, GCHandleType.Pinned);
_bufferIsPinned = true;

pBuffers[0] = Libuv.buf_init(
_pinBuffer.AddrOfPinnedObject() + buf.Offset,
buf.Count);

_callback = callback;
_state = state;
_uv.write2(this, handle, pBuffers, BUFFER_COUNT, sendHandle, _uv_write2_cb);
}
catch
{
_callback = null;
_state = null;
Unpin(this);
throw;
}
}

private static void Unpin(UvWriteReq2 req)
{
req._pinUvWrite2Req.Free();
if (req._bufferIsPinned)
{
req._pinBuffer.Free();
req._bufferIsPinned = false;
}
}

private static void UvWrite2Cb(IntPtr ptr, int status)
{
var req = FromIntPtr<UvWriteReq2>(ptr);
Unpin(req);

var callback = req._callback;
req._callback = null;

var state = req._state;
req._state = null;

Exception error = null;
if (status < 0)
{
req.Libuv.Check(status, out error);
}

try
{
callback(req, status, error, state);
}
catch (Exception ex)
{
req._log.LogError("UvWrite2Cb", ex);
throw;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ public void ServerPipeDispatchConnections()

serverConnectionPipeAcceptedEvent.WaitOne();

var writeRequest = new UvWriteReq(new KestrelTrace(new TestKestrelTrace()));
var writeRequest = new UvWriteReq2(new KestrelTrace(new TestKestrelTrace()));
writeRequest.Init(loop);
writeRequest.Write2(
serverConnectionPipe,
new ArraySegment<ArraySegment<byte>>(new ArraySegment<byte>[] { new ArraySegment<byte>(new byte[] { 1, 2, 3, 4 }) }),
new ArraySegment<byte>(new byte[] { 1, 2, 3, 4 }),
serverConnectionTcp,
(_3, status2, error2, _4) =>
{
Expand Down