44
44
import io .netty .handler .ssl .SslContextBuilder ;
45
45
import io .netty .handler .ssl .SslHandler ;
46
46
import io .netty .handler .stream .ChunkedWriteHandler ;
47
+ import io .netty .util .ReferenceCounted ;
47
48
import io .netty .util .concurrent .GlobalEventExecutor ;
48
49
49
50
import java .io .ByteArrayOutputStream ;
61
62
import java .util .List ;
62
63
import java .util .Objects ;
63
64
import java .util .concurrent .ConcurrentHashMap ;
65
+ import java .util .concurrent .ExecutionException ;
64
66
import java .util .concurrent .TimeUnit ;
65
67
66
68
import javax .crypto .SecretKey ;
@@ -115,6 +117,7 @@ public void testGetMapsChunkedFileSSl() throws Exception {
115
117
final LinkedList <Object > unencryptedMessages = new LinkedList <>();
116
118
final EmbeddedChannel shuffle = t .createShuffleHandlerSSL (unencryptedMessages );
117
119
t .testGetAllAttemptsForReduce0NoKeepAlive (unencryptedMessages , shuffle );
120
+ drainChannel (shuffle );
118
121
}
119
122
120
123
@ Test
@@ -192,8 +195,10 @@ public void testIncompatibleShuffleVersion() {
192
195
193
196
assertEquals (getExpectedHttpResponse (HttpResponseStatus .BAD_REQUEST ).toString (),
194
197
actual .toString ());
198
+ tryRelease (actual );
195
199
196
200
assertFalse (shuffle .isActive (), "closed" ); // known-issue
201
+ drainChannel (decoder );
197
202
}
198
203
199
204
@ Test
@@ -210,11 +215,13 @@ public void testInvalidMapNoIndexFile() {
210
215
}
211
216
212
217
DefaultHttpResponse actual = decoder .readInbound ();
218
+ drainChannel (decoder );
213
219
assertFalse (actual .headers ().get (CONTENT_LENGTH ).isEmpty ());
214
220
actual .headers ().set (CONTENT_LENGTH , 0 );
215
221
216
222
assertEquals (getExpectedHttpResponse (HttpResponseStatus .INTERNAL_SERVER_ERROR ).toString (),
217
223
actual .toString ());
224
+ tryRelease (actual );
218
225
219
226
assertFalse (shuffle .isActive (), "closed" );
220
227
}
@@ -237,15 +244,36 @@ public void testInvalidMapNoDataFile() {
237
244
}
238
245
239
246
DefaultHttpResponse actual = decoder .readInbound ();
247
+ drainChannel (decoder );
240
248
assertFalse (actual .headers ().get (CONTENT_LENGTH ).isEmpty ());
241
249
actual .headers ().set (CONTENT_LENGTH , 0 );
242
250
243
251
assertEquals (getExpectedHttpResponse (HttpResponseStatus .INTERNAL_SERVER_ERROR ).toString (),
244
252
actual .toString ());
253
+ tryRelease (actual );
245
254
246
255
assertFalse (shuffle .isActive (), "closed" );
247
256
}
248
257
258
+ private void drainChannel (EmbeddedChannel ch ) {
259
+ Object o ;
260
+ while ((o = ch .readInbound ())!=null ) {
261
+ tryRelease (o );
262
+ }
263
+ while ((o = ch .readOutbound ())!=null ) {
264
+ tryRelease (o );
265
+ }
266
+ }
267
+
268
+ private void tryRelease (Object obj ) {
269
+ if (obj instanceof ReferenceCounted ) {
270
+ ReferenceCounted bb = (ReferenceCounted ) obj ;
271
+ if (bb .refCnt () > 0 ) {
272
+ bb .release (bb .refCnt ());
273
+ }
274
+ }
275
+ }
276
+
249
277
private DefaultHttpResponse getExpectedHttpResponse (HttpResponseStatus status ) {
250
278
DefaultHttpResponse response = new DefaultHttpResponse (HTTP_1_1 , status );
251
279
response .headers ().set (CONTENT_TYPE , "text/plain; charset=UTF-8" );
@@ -365,8 +393,8 @@ private void testGetAllAttemptsForReduce0NoKeepAlive(
365
393
assertFalse (shuffle .isActive (), "no keep-alive" );
366
394
}
367
395
368
- private void testKeepAlive (java .util .Queue <Object > messages ,
369
- EmbeddedChannel shuffle ) throws IOException {
396
+ private void testKeepAlive (java .util .Queue <Object > messages , EmbeddedChannel shuffle )
397
+ throws IOException , InterruptedException , ExecutionException {
370
398
final FullHttpRequest req1 = createRequest (
371
399
getUri (TEST_JOB_ID , 0 , Collections .singletonList (TEST_ATTEMPT_1 ), true ));
372
400
shuffle .writeInbound (req1 );
@@ -375,6 +403,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
375
403
getAttemptData (new Attempt (TEST_ATTEMPT_1 , TEST_DATA_A ))
376
404
);
377
405
assertTrue (shuffle .isActive (), "keep-alive" );
406
+ drainChannel (shuffle );
378
407
messages .clear ();
379
408
380
409
final FullHttpRequest req2 = createRequest (
@@ -385,6 +414,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
385
414
getAttemptData (new Attempt (TEST_ATTEMPT_2 , TEST_DATA_B ))
386
415
);
387
416
assertTrue (shuffle .isActive (), "keep-alive" );
417
+ drainChannel (shuffle );
388
418
messages .clear ();
389
419
390
420
final FullHttpRequest req3 = createRequest (
@@ -395,6 +425,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
395
425
getAttemptData (new Attempt (TEST_ATTEMPT_3 , TEST_DATA_C ))
396
426
);
397
427
assertFalse (shuffle .isActive (), "no keep-alive" );
428
+ drainChannel (shuffle );
398
429
}
399
430
400
431
private ArrayList <ByteBuf > getAllAttemptsForReduce0 () throws IOException {
@@ -431,11 +462,13 @@ private void assertResponse(java.util.Queue<Object> outboundMessages,
431
462
decodeChannel .writeInbound (actualBytes );
432
463
Object obj = decodeChannel .readInbound ();
433
464
LOG .info ("Decoded object: {}" , obj );
465
+ drainChannel (decodeChannel );
434
466
435
467
if (i == 0 ) {
436
468
DefaultHttpResponse resp = (DefaultHttpResponse ) obj ;
437
469
assertEquals (response .toString (), resp .toString ());
438
470
}
471
+ tryRelease (obj );
439
472
if (i > 0 && i <= content .size ()) {
440
473
assertEquals (ByteBufUtil .prettyHexDump (content .get (i - 1 )),
441
474
actualHexdump , "data should match" );
0 commit comments