Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Commit a696eb8

Browse files
committed
Merge branch 'benaadams/pool-writecontexts' into dev
2 parents b143557 + 5665eba commit a696eb8

File tree

7 files changed

+145
-44
lines changed

7 files changed

+145
-44
lines changed

src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context
4242
_connectionId = Interlocked.Increment(ref _lastConnectionId);
4343

4444
_rawSocketInput = new SocketInput(Memory2, ThreadPool);
45-
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool);
45+
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool);
4646
}
4747

4848
public void Start()

src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ public void Dispose()
9494
{
9595
var socket = (Listener)tcs2.Task.AsyncState;
9696
socket.ListenSocket.Dispose();
97+
98+
var writeReqPool = socket.WriteReqPool;
99+
while (writeReqPool.Count > 0)
100+
{
101+
writeReqPool.Dequeue().Dispose();
102+
}
103+
97104
tcs2.SetResult(0);
98105
}
99106
catch (Exception ex)

src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4-
using Microsoft.AspNet.Http;
4+
using System.Collections.Generic;
55
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
6+
using Microsoft.AspNet.Server.Kestrel.Networking;
67

78
namespace Microsoft.AspNet.Server.Kestrel.Http
89
{
@@ -17,6 +18,7 @@ public ListenerContext(ServiceContext serviceContext)
1718
: base(serviceContext)
1819
{
1920
Memory2 = new MemoryPool2();
21+
WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs);
2022
}
2123

2224
public ListenerContext(ListenerContext listenerContext)
@@ -25,6 +27,7 @@ public ListenerContext(ListenerContext listenerContext)
2527
ServerAddress = listenerContext.ServerAddress;
2628
Thread = listenerContext.Thread;
2729
Memory2 = listenerContext.Memory2;
30+
WriteReqPool = listenerContext.WriteReqPool;
2831
Log = listenerContext.Log;
2932
}
3033

@@ -33,5 +36,7 @@ public ListenerContext(ListenerContext listenerContext)
3336
public KestrelThread Thread { get; set; }
3437

3538
public MemoryPool2 Memory2 { get; set; }
39+
40+
public Queue<UvWriteReq> WriteReqPool { get; set; }
3641
}
3742
}

src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs

Lines changed: 95 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
1414
{
1515
public class SocketOutput : ISocketOutput
1616
{
17+
public const int MaxPooledWriteReqs = 1024;
18+
1719
private const int _maxPendingWrites = 3;
1820
private const int _maxBytesPreCompleted = 65536;
1921
private const int _initialTaskQueues = 64;
22+
private const int _maxPooledWriteContexts = 32;
2023

2124
private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);
2225

@@ -42,12 +45,13 @@ public class SocketOutput : ISocketOutput
4245
// The number of write operations that have been scheduled so far
4346
// but have not completed.
4447
private int _writesPending = 0;
45-
4648
private int _numBytesPreCompleted = 0;
4749
private Exception _lastWriteError;
4850
private WriteContext _nextWriteContext;
4951
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
5052
private readonly Queue<TaskCompletionSource<object>> _tasksCompleted;
53+
private readonly Queue<WriteContext> _writeContextPool;
54+
private readonly Queue<UvWriteReq> _writeReqPool;
5155

5256
public SocketOutput(
5357
KestrelThread thread,
@@ -56,7 +60,8 @@ public SocketOutput(
5660
Connection connection,
5761
long connectionId,
5862
IKestrelTrace log,
59-
IThreadPool threadPool)
63+
IThreadPool threadPool,
64+
Queue<UvWriteReq> writeReqPool)
6065
{
6166
_thread = thread;
6267
_socket = socket;
@@ -66,6 +71,8 @@ public SocketOutput(
6671
_threadPool = threadPool;
6772
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
6873
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
74+
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
75+
_writeReqPool = writeReqPool;
6976

7077
_head = memory.Lease();
7178
_tail = _head;
@@ -92,7 +99,14 @@ public Task WriteAsync(
9299
{
93100
if (_nextWriteContext == null)
94101
{
95-
_nextWriteContext = new WriteContext(this);
102+
if (_writeContextPool.Count > 0)
103+
{
104+
_nextWriteContext = _writeContextPool.Dequeue();
105+
}
106+
else
107+
{
108+
_nextWriteContext = new WriteContext(this);
109+
}
96110
}
97111

98112
if (socketShutdownSend)
@@ -274,9 +288,12 @@ private void WriteAllPending()
274288
}
275289

276290
// This is called on the libuv event loop
277-
private void OnWriteCompleted(int bytesWritten, int status, Exception error)
291+
private void OnWriteCompleted(WriteContext writeContext)
278292
{
279-
_log.ConnectionWriteCallback(_connectionId, status);
293+
var bytesWritten = writeContext.ByteCount;
294+
var status = writeContext.WriteStatus;
295+
var error = writeContext.WriteError;
296+
280297

281298
if (error != null)
282299
{
@@ -290,6 +307,7 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error)
290307

291308
lock (_contextLock)
292309
{
310+
PoolWriteContext(writeContext);
293311
if (_nextWriteContext != null)
294312
{
295313
scheduleWrite = true;
@@ -332,11 +350,11 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error)
332350
}
333351
}
334352

353+
_log.ConnectionWriteCallback(_connectionId, status);
354+
335355
if (scheduleWrite)
336356
{
337-
// ScheduleWrite();
338-
// on right thread, fairness issues?
339-
WriteAllPending();
357+
ScheduleWrite();
340358
}
341359

342360
_tasksCompleted.Clear();
@@ -367,6 +385,16 @@ private void ReturnAllBlocks()
367385
}
368386
}
369387

388+
private void PoolWriteContext(WriteContext writeContext)
389+
{
390+
// called inside _contextLock
391+
if (_writeContextPool.Count < _maxPooledWriteContexts)
392+
{
393+
writeContext.Reset();
394+
_writeContextPool.Enqueue(writeContext);
395+
}
396+
}
397+
370398
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
371399
{
372400
var task = WriteAsync(buffer, immediate);
@@ -412,19 +440,18 @@ private class WriteContext
412440
{
413441
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);
414442

443+
private SocketOutput Self;
444+
private UvWriteReq _writeReq;
415445
private MemoryPoolIterator2 _lockedStart;
416446
private MemoryPoolIterator2 _lockedEnd;
417447
private int _bufferCount;
418-
private int _byteCount;
419-
420-
public SocketOutput Self;
421448

449+
public int ByteCount;
422450
public bool SocketShutdownSend;
423451
public bool SocketDisconnect;
424452

425453
public int WriteStatus;
426454
public Exception WriteError;
427-
428455
public int ShutdownSendStatus;
429456

430457
public WriteContext(SocketOutput self)
@@ -439,27 +466,40 @@ public void DoWriteIfNeeded()
439466
{
440467
LockWrite();
441468

442-
if (_byteCount == 0 || Self._socket.IsClosed)
469+
if (ByteCount == 0 || Self._socket.IsClosed)
443470
{
444471
DoShutdownIfNeeded();
445472
return;
446473
}
447474

448-
var writeReq = new UvWriteReq(Self._log);
449-
writeReq.Init(Self._thread.Loop);
475+
// Sample values locally in case write completes inline
476+
// to allow block to be Reset and still complete this function
477+
var lockedEndBlock = _lockedEnd.Block;
478+
var lockedEndIndex = _lockedEnd.Index;
450479

451-
writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
480+
if (Self._writeReqPool.Count > 0)
452481
{
453-
_writeReq.Dispose();
454-
var _this = (WriteContext)state;
455-
_this.ScheduleReturnFullyWrittenBlocks();
456-
_this.WriteStatus = status;
457-
_this.WriteError = error;
458-
_this.DoShutdownIfNeeded();
482+
_writeReq = Self._writeReqPool.Dequeue();
483+
}
484+
else
485+
{
486+
_writeReq = new UvWriteReq(Self._log);
487+
_writeReq.Init(Self._thread.Loop);
488+
}
489+
490+
_writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
491+
{
492+
var writeContext = (WriteContext)state;
493+
writeContext.PoolWriteReq(writeContext._writeReq);
494+
writeContext._writeReq = null;
495+
writeContext.ScheduleReturnFullyWrittenBlocks();
496+
writeContext.WriteStatus = status;
497+
writeContext.WriteError = error;
498+
writeContext.DoShutdownIfNeeded();
459499
}, this);
460500

461-
Self._head = _lockedEnd.Block;
462-
Self._head.Start = _lockedEnd.Index;
501+
Self._head = lockedEndBlock;
502+
Self._head.Start = lockedEndIndex;
463503
}
464504

465505
/// <summary>
@@ -506,9 +546,21 @@ public void DoDisconnectIfNeeded()
506546

507547
public void Complete()
508548
{
509-
Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError);
549+
Self.OnWriteCompleted(this);
550+
}
551+
552+
private void PoolWriteReq(UvWriteReq writeReq)
553+
{
554+
if (Self._writeReqPool.Count < MaxPooledWriteReqs)
555+
{
556+
Self._writeReqPool.Enqueue(writeReq);
557+
}
558+
else
559+
{
560+
writeReq.Dispose();
561+
}
510562
}
511-
563+
512564
private void ScheduleReturnFullyWrittenBlocks()
513565
{
514566
var block = _lockedStart.Block;
@@ -556,7 +608,23 @@ private void LockWrite()
556608
_lockedStart = new MemoryPoolIterator2(head, head.Start);
557609
_lockedEnd = new MemoryPoolIterator2(tail, tail.End);
558610

559-
BytesBetween(_lockedStart, _lockedEnd, out _byteCount, out _bufferCount);
611+
BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount);
612+
}
613+
614+
public void Reset()
615+
{
616+
_lockedStart = default(MemoryPoolIterator2);
617+
_lockedEnd = default(MemoryPoolIterator2);
618+
_bufferCount = 0;
619+
ByteCount = 0;
620+
621+
SocketShutdownSend = false;
622+
SocketDisconnect = false;
623+
624+
WriteStatus = 0;
625+
WriteError = null;
626+
627+
ShutdownSendStatus = 0;
560628
}
561629
}
562630
}

src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@ namespace Microsoft.AspNet.Server.Kestrel
1818
/// </summary>
1919
public class KestrelThread
2020
{
21+
// maximum times the work queues swapped and are processed in a single pass
22+
// as completing a task may immediately have write data to put on the network
23+
// otherwise it needs to wait till the next pass of the libuv loop
24+
private const int _maxLoops = 8;
25+
2126
private static Action<object, object> _threadCallbackAdapter = (callback, state) => ((Action<KestrelThread>)callback).Invoke((KestrelThread)state);
2227
private KestrelEngine _engine;
2328
private readonly IApplicationLifetime _appLifetime;
2429
private Thread _thread;
2530
private UvLoopHandle _loop;
2631
private UvAsyncHandle _post;
27-
private Queue<Work> _workAdding = new Queue<Work>();
28-
private Queue<Work> _workRunning = new Queue<Work>();
29-
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>();
30-
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>();
32+
private Queue<Work> _workAdding = new Queue<Work>(1024);
33+
private Queue<Work> _workRunning = new Queue<Work>(1024);
34+
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>(256);
35+
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>(256);
3136
private object _workSync = new Object();
3237
private bool _stopImmediate = false;
3338
private bool _initCompleted = false;
@@ -249,11 +254,17 @@ private void ThreadStart(object parameter)
249254

250255
private void OnPost()
251256
{
252-
DoPostWork();
253-
DoPostCloseHandle();
257+
var loopsRemaining = _maxLoops;
258+
bool wasWork;
259+
do
260+
{
261+
wasWork = DoPostWork();
262+
wasWork = DoPostCloseHandle() || wasWork;
263+
loopsRemaining--;
264+
} while (wasWork && loopsRemaining > 0);
254265
}
255266

256-
private void DoPostWork()
267+
private bool DoPostWork()
257268
{
258269
Queue<Work> queue;
259270
lock (_workSync)
@@ -262,6 +273,9 @@ private void DoPostWork()
262273
_workAdding = _workRunning;
263274
_workRunning = queue;
264275
}
276+
277+
bool wasWork = queue.Count > 0;
278+
265279
while (queue.Count != 0)
266280
{
267281
var work = queue.Dequeue();
@@ -286,8 +300,10 @@ private void DoPostWork()
286300
}
287301
}
288302
}
303+
304+
return wasWork;
289305
}
290-
private void DoPostCloseHandle()
306+
private bool DoPostCloseHandle()
291307
{
292308
Queue<CloseHandle> queue;
293309
lock (_workSync)
@@ -296,6 +312,9 @@ private void DoPostCloseHandle()
296312
_closeHandleAdding = _closeHandleRunning;
297313
_closeHandleRunning = queue;
298314
}
315+
316+
bool wasWork = queue.Count > 0;
317+
299318
while (queue.Count != 0)
300319
{
301320
var closeHandle = queue.Dequeue();
@@ -309,6 +328,8 @@ private void DoPostCloseHandle()
309328
throw;
310329
}
311330
}
331+
332+
return wasWork;
312333
}
313334

314335
private struct Work

0 commit comments

Comments
 (0)