Skip to content

MAPREDUCE-7503. Fix ByteBuf leaks in TestShuffleChannelHandler #7500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.io.ByteArrayOutputStream;
Expand All @@ -61,6 +62,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import javax.crypto.SecretKey;
Expand Down Expand Up @@ -115,6 +117,7 @@ public void testGetMapsChunkedFileSSl() throws Exception {
final LinkedList<Object> unencryptedMessages = new LinkedList<>();
final EmbeddedChannel shuffle = t.createShuffleHandlerSSL(unencryptedMessages);
t.testGetAllAttemptsForReduce0NoKeepAlive(unencryptedMessages, shuffle);
drainChannel(shuffle);
}

@Test
Expand Down Expand Up @@ -192,8 +195,10 @@ public void testIncompatibleShuffleVersion() {

assertEquals(getExpectedHttpResponse(HttpResponseStatus.BAD_REQUEST).toString(),
actual.toString());
tryRelease(actual);

assertFalse(shuffle.isActive(), "closed"); // known-issue
drainChannel(decoder);
}

@Test
Expand All @@ -210,11 +215,13 @@ public void testInvalidMapNoIndexFile() {
}

DefaultHttpResponse actual = decoder.readInbound();
drainChannel(decoder);
assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
actual.headers().set(CONTENT_LENGTH, 0);

assertEquals(getExpectedHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR).toString(),
actual.toString());
tryRelease(actual);

assertFalse(shuffle.isActive(), "closed");
}
Expand All @@ -237,15 +244,36 @@ public void testInvalidMapNoDataFile() {
}

DefaultHttpResponse actual = decoder.readInbound();
drainChannel(decoder);
assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
actual.headers().set(CONTENT_LENGTH, 0);

assertEquals(getExpectedHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR).toString(),
actual.toString());
tryRelease(actual);

assertFalse(shuffle.isActive(), "closed");
}

private void drainChannel(EmbeddedChannel ch) {
Object o;
while((o = ch.readInbound())!=null) {
tryRelease(o);
}
while((o = ch.readOutbound())!=null) {
tryRelease(o);
}
}

private void tryRelease(Object obj) {
if (obj instanceof ReferenceCounted) {
ReferenceCounted bb = (ReferenceCounted) obj;
if (bb.refCnt() > 0) {
bb.release(bb.refCnt());
}
}
}

private DefaultHttpResponse getExpectedHttpResponse(HttpResponseStatus status) {
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
Expand Down Expand Up @@ -365,8 +393,8 @@ private void testGetAllAttemptsForReduce0NoKeepAlive(
assertFalse(shuffle.isActive(), "no keep-alive");
}

private void testKeepAlive(java.util.Queue<Object> messages,
EmbeddedChannel shuffle) throws IOException {
private void testKeepAlive(java.util.Queue<Object> messages, EmbeddedChannel shuffle)
throws IOException, InterruptedException, ExecutionException {
final FullHttpRequest req1 = createRequest(
getUri(TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true));
shuffle.writeInbound(req1);
Expand All @@ -375,6 +403,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
getAttemptData(new Attempt(TEST_ATTEMPT_1, TEST_DATA_A))
);
assertTrue(shuffle.isActive(), "keep-alive");
drainChannel(shuffle);
messages.clear();

final FullHttpRequest req2 = createRequest(
Expand All @@ -385,6 +414,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
getAttemptData(new Attempt(TEST_ATTEMPT_2, TEST_DATA_B))
);
assertTrue(shuffle.isActive(), "keep-alive");
drainChannel(shuffle);
messages.clear();

final FullHttpRequest req3 = createRequest(
Expand All @@ -395,6 +425,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
getAttemptData(new Attempt(TEST_ATTEMPT_3, TEST_DATA_C))
);
assertFalse(shuffle.isActive(), "no keep-alive");
drainChannel(shuffle);
}

private ArrayList<ByteBuf> getAllAttemptsForReduce0() throws IOException {
Expand Down Expand Up @@ -431,11 +462,13 @@ private void assertResponse(java.util.Queue<Object> outboundMessages,
decodeChannel.writeInbound(actualBytes);
Object obj = decodeChannel.readInbound();
LOG.info("Decoded object: {}", obj);
drainChannel(decodeChannel);

if (i == 0) {
DefaultHttpResponse resp = (DefaultHttpResponse) obj;
assertEquals(response.toString(), resp.toString());
}
tryRelease(obj);
if (i > 0 && i <= content.size()) {
assertEquals(ByteBufUtil.prettyHexDump(content.get(i - 1)),
actualHexdump, "data should match");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ public void setup() throws IOException {

@AfterEach
public void teardown() {
//Trigger GC so that we get the leak warnings early
System.gc();
try {
// Wait for logger to flush
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.setOut(standardOut);
System.out.print(outputStreamCaptor);
// For this to work ch.qos.logback.classic is needed for some reason
Expand Down