18
18
import io .netty .util .ByteProcessor ;
19
19
import io .netty .util .NettyRuntime ;
20
20
import io .netty .util .ReferenceCounted ;
21
+ import io .netty .util .concurrent .FastThreadLocal ;
21
22
import io .netty .util .concurrent .FastThreadLocalThread ;
22
23
import io .netty .util .internal .ObjectPool ;
23
24
import io .netty .util .internal .ObjectUtil ;
24
25
import io .netty .util .internal .PlatformDependent ;
25
26
import io .netty .util .internal .SuppressJava6Requirement ;
26
27
import io .netty .util .internal .SystemPropertyUtil ;
28
+ import io .netty .util .internal .ThreadExecutorMap ;
27
29
import io .netty .util .internal .UnstableApi ;
28
30
29
31
import java .io .IOException ;
37
39
import java .nio .channels .ScatteringByteChannel ;
38
40
import java .util .Arrays ;
39
41
import java .util .Queue ;
42
+ import java .util .Set ;
40
43
import java .util .concurrent .ConcurrentLinkedQueue ;
44
+ import java .util .concurrent .CopyOnWriteArraySet ;
41
45
import java .util .concurrent .atomic .AtomicLong ;
42
46
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
43
47
import java .util .concurrent .locks .StampedLock ;
73
77
@ SuppressJava6Requirement (reason = "Guarded by version check" )
74
78
@ UnstableApi
75
79
final class AdaptivePoolingAllocator {
80
+
81
+ enum MagazineCaching {
82
+ EventLoopThreads ,
83
+ FastThreadLocalThreads ,
84
+ None
85
+ }
86
+
87
+ private static final int EXPANSION_ATTEMPTS = 3 ;
88
+ private static final int INITIAL_MAGAZINES = 4 ;
76
89
private static final int RETIRE_CAPACITY = 4 * 1024 ;
77
90
private static final int MIN_CHUNK_SIZE = 128 * 1024 ;
78
91
private static final int MAX_STRIPES = NettyRuntime .availableProcessors () * 2 ;
@@ -97,20 +110,55 @@ final class AdaptivePoolingAllocator {
97
110
private static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil .getInt (
98
111
"io.netty.allocator.centralQueueCapacity" , NettyRuntime .availableProcessors ());
99
112
113
+ private static final Object NO_MAGAZINE = Boolean .TRUE ;
114
+
100
115
private final ChunkAllocator chunkAllocator ;
101
116
private final Queue <ChunkByteBuf > centralQueue ;
102
117
private final StampedLock magazineExpandLock ;
103
118
private volatile Magazine [] magazines ;
119
+ private final FastThreadLocal <Object > threadLocalMagazine ;
120
+ private final Set <Magazine > liveCachedMagazines ;
104
121
105
- AdaptivePoolingAllocator (ChunkAllocator chunkAllocator ) {
122
+ AdaptivePoolingAllocator (ChunkAllocator chunkAllocator , MagazineCaching magazineCaching ) {
123
+ ObjectUtil .checkNotNull (chunkAllocator , "chunkAllocator" );
124
+ ObjectUtil .checkNotNull (magazineCaching , "magazineCaching" );
106
125
this .chunkAllocator = chunkAllocator ;
107
126
if (javaVersion () < 8 ) {
108
127
// The implementation uses StampedLock, which was introduced in Java 8.
109
128
throw new IllegalStateException ("This allocator require Java 8 or newer." );
110
129
}
111
130
centralQueue = ObjectUtil .checkNotNull (createSharedChunkQueue (), "centralQueue" );
112
131
magazineExpandLock = new StampedLock ();
113
- Magazine [] mags = new Magazine [4 ];
132
+ if (magazineCaching != MagazineCaching .None ) {
133
+ assert magazineCaching == MagazineCaching .EventLoopThreads ||
134
+ magazineCaching == MagazineCaching .FastThreadLocalThreads ;
135
+ final boolean cachedMagazinesNonEventLoopThreads =
136
+ magazineCaching == MagazineCaching .FastThreadLocalThreads ;
137
+ final Set <Magazine > liveMagazines = new CopyOnWriteArraySet <Magazine >();
138
+ threadLocalMagazine = new FastThreadLocal <Object >() {
139
+ @ Override
140
+ protected Object initialValue () {
141
+ if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap .currentExecutor () != null ) {
142
+ Magazine mag = new Magazine (AdaptivePoolingAllocator .this , false );
143
+ liveMagazines .add (mag );
144
+ return mag ;
145
+ }
146
+ return NO_MAGAZINE ;
147
+ }
148
+
149
+ @ Override
150
+ protected void onRemoval (final Object value ) throws Exception {
151
+ if (value != NO_MAGAZINE ) {
152
+ liveMagazines .remove (value );
153
+ }
154
+ }
155
+ };
156
+ liveCachedMagazines = liveMagazines ;
157
+ } else {
158
+ threadLocalMagazine = null ;
159
+ liveCachedMagazines = null ;
160
+ }
161
+ Magazine [] mags = new Magazine [INITIAL_MAGAZINES ];
114
162
for (int i = 0 ; i < mags .length ; i ++) {
115
163
mags [i ] = new Magazine (this );
116
164
}
@@ -158,8 +206,15 @@ ByteBuf allocate(int size, int maxCapacity) {
158
206
}
159
207
160
208
private AdaptiveByteBuf allocate (int size , int maxCapacity , Thread currentThread , AdaptiveByteBuf buf ) {
161
- long threadId = currentThread .getId ();
162
209
int sizeBucket = AllocationStatistics .sizeBucket (size ); // Compute outside of Magazine lock for better ILP.
210
+ FastThreadLocal <Object > threadLocalMagazine = this .threadLocalMagazine ;
211
+ if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread ) {
212
+ Object mag = threadLocalMagazine .get ();
213
+ if (mag != NO_MAGAZINE ) {
214
+ return ((Magazine ) mag ).allocate (size , sizeBucket , maxCapacity , buf );
215
+ }
216
+ }
217
+ long threadId = currentThread .getId ();
163
218
Magazine [] mags ;
164
219
int expansions = 0 ;
165
220
do {
@@ -178,7 +233,7 @@ private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread
178
233
}
179
234
}
180
235
expansions ++;
181
- } while (expansions <= 3 && tryExpandMagazines (mags .length ));
236
+ } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines (mags .length ));
182
237
return null ;
183
238
}
184
239
@@ -204,6 +259,11 @@ long usedMemory() {
204
259
for (Magazine magazine : magazines ) {
205
260
sum += magazine .usedMemory .get ();
206
261
}
262
+ if (liveCachedMagazines != null ) {
263
+ for (Magazine magazine : liveCachedMagazines ) {
264
+ sum += magazine .usedMemory .get ();
265
+ }
266
+ }
207
267
return sum ;
208
268
}
209
269
@@ -247,6 +307,7 @@ private static class AllocationStatistics extends StampedLock {
247
307
private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1 ;
248
308
249
309
protected final AdaptivePoolingAllocator parent ;
310
+ private final boolean shareable ;
250
311
private final short [][] histos = {
251
312
new short [HISTO_BUCKET_COUNT ], new short [HISTO_BUCKET_COUNT ],
252
313
new short [HISTO_BUCKET_COUNT ], new short [HISTO_BUCKET_COUNT ],
@@ -260,8 +321,9 @@ private static class AllocationStatistics extends StampedLock {
260
321
private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE ;
261
322
protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE ;
262
323
263
- private AllocationStatistics (AdaptivePoolingAllocator parent ) {
324
+ private AllocationStatistics (AdaptivePoolingAllocator parent , boolean shareable ) {
264
325
this .parent = parent ;
326
+ this .shareable = shareable ;
265
327
}
266
328
267
329
protected void recordAllocationSize (int bucket ) {
@@ -300,8 +362,10 @@ private void rotateHistograms() {
300
362
int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT ;
301
363
int prefChunkSize = Math .max (percentileSize * BUFS_PER_CHUNK , MIN_CHUNK_SIZE );
302
364
localPrefChunkSize = prefChunkSize ;
303
- for (Magazine mag : parent .magazines ) {
304
- prefChunkSize = Math .max (prefChunkSize , mag .localPrefChunkSize );
365
+ if (shareable ) {
366
+ for (Magazine mag : parent .magazines ) {
367
+ prefChunkSize = Math .max (prefChunkSize , mag .localPrefChunkSize );
368
+ }
305
369
}
306
370
if (sharedPrefChunkSize != prefChunkSize ) {
307
371
// Preferred chunk size changed. Increase check frequency.
@@ -344,7 +408,11 @@ private static final class Magazine extends AllocationStatistics {
344
408
private final AtomicLong usedMemory ;
345
409
346
410
Magazine (AdaptivePoolingAllocator parent ) {
347
- super (parent );
411
+ this (parent , true );
412
+ }
413
+
414
+ Magazine (AdaptivePoolingAllocator parent , boolean shareable ) {
415
+ super (parent , shareable );
348
416
usedMemory = new AtomicLong ();
349
417
}
350
418
0 commit comments