Skip to content

Commit 0c9cca7

Browse files
committed
2 parents d3493c9 + cd8f18b commit 0c9cca7

File tree

103 files changed

+4519
-3616
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

103 files changed

+4519
-3616
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,7 +1462,6 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
14621462
private void checkAsyncCall() throws IOException {
14631463
if (isAsynchronousMode()) {
14641464
if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
1465-
asyncCallCounter.decrementAndGet();
14661465
String errMsg = String.format(
14671466
"Exceeded limit of max asynchronous calls: %d, " +
14681467
"please configure %s to adjust it.",
@@ -1518,7 +1517,7 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
15181517
ioe.initCause(ie);
15191518
throw ioe;
15201519
}
1521-
} catch(Exception e) {
1520+
} catch (Exception e) {
15221521
if (isAsynchronousMode()) {
15231522
releaseAsyncCall();
15241523
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,9 @@ public PathLocation getDestinationForPath(String path) throws IOException {
118118
public void addResolver(DestinationOrder order, OrderedResolver resolver) {
119119
orderedResolvers.put(order, resolver);
120120
}
121-
}
121+
122+
@VisibleForTesting
123+
public OrderedResolver getOrderedResolver(DestinationOrder order) {
124+
return orderedResolvers.get(order);
125+
}
126+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.hadoop.classification.VisibleForTesting;
4646
import org.apache.hadoop.thirdparty.com.google.common.net.HostAndPort;
4747

48+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
4849

4950
/**
5051
* The local subcluster (where the writer is) should be tried first. The writer
@@ -124,9 +125,9 @@ String getClientAddr() {
124125
* needs to be done as a privileged action to use the user for the Router and
125126
* not the one from the client in the RPC call.
126127
*
127-
* @return DN IP -> Subcluster.
128+
* @return DN IP -> Subcluster.
128129
*/
129-
private Map<String, String> getDatanodesSubcluster() {
130+
public Map<String, String> getDatanodesSubcluster() {
130131

131132
final RouterRpcServer rpcServer = getRpcServer();
132133
if (rpcServer == null) {
@@ -143,9 +144,16 @@ private Map<String, String> getDatanodesSubcluster() {
143144
@Override
144145
public Map<String, DatanodeStorageReport[]> run() {
145146
try {
146-
return rpcServer.getDatanodeStorageReportMap(
147-
DatanodeReportType.ALL);
148-
} catch (IOException e) {
147+
Map<String, DatanodeStorageReport[]> result;
148+
if (rpcServer.isAsync()) {
149+
rpcServer.getDatanodeStorageReportMapAsync(DatanodeReportType.ALL);
150+
result = syncReturn(Map.class);
151+
} else {
152+
result = rpcServer.getDatanodeStorageReportMap(
153+
DatanodeReportType.ALL);
154+
}
155+
return result;
156+
} catch (Exception e) {
149157
LOG.error("Cannot get the datanodes from the RPC server", e);
150158
return null;
151159
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public static <R> R syncReturn(Class<R> clazz)
127127
try {
128128
return (R) completableFuture.get();
129129
} catch (ExecutionException e) {
130-
throw (Exception)e.getCause();
130+
throw (Exception) e.getCause();
131131
}
132132
}
133133

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRPCMultipleDestinationMountTableResolver.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,22 @@
2525
import org.apache.hadoop.hdfs.MiniDFSCluster;
2626
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
2727
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
28+
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
2829
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
2930
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
3031
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
32+
import org.apache.hadoop.hdfs.server.federation.resolver.order.LocalResolver;
3133
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
34+
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
3235
import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
3336
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
3437
import org.apache.hadoop.hdfs.server.federation.router.TestRouterRPCMultipleDestinationMountTableResolver;
38+
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
3539
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
3640
import org.junit.BeforeClass;
3741
import org.junit.Test;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
3844

3945
import java.io.IOException;
4046
import java.util.HashMap;
@@ -52,14 +58,21 @@
5258
public class TestRouterAsyncRPCMultipleDestinationMountTableResolver extends
5359
TestRouterRPCMultipleDestinationMountTableResolver {
5460

61+
public static final Logger LOG =
62+
LoggerFactory.getLogger(TestRouterAsyncRPCMultipleDestinationMountTableResolver.class);
63+
5564
@BeforeClass
5665
public static void setUp() throws Exception {
5766

5867
// Build and start a federated cluster.
5968
cluster = new StateStoreDFSCluster(false, 3,
6069
MultipleDestinationMountTableResolver.class);
61-
Configuration routerConf =
62-
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
70+
Configuration routerConf = new RouterConfigBuilder()
71+
.stateStore()
72+
.admin()
73+
.quota()
74+
.rpc()
75+
.build();
6376
routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
6477

6578
Configuration hdfsConf = new Configuration(false);
@@ -84,6 +97,43 @@ public static void setUp() throws Exception {
8497
rpcServer =routerContext.getRouter().getRpcServer();
8598
}
8699

100+
@Test
101+
public void testLocalResolverGetDatanodesSubcluster() throws IOException {
102+
String testPath = "/testLocalResolverGetDatanodesSubcluster";
103+
Path path = new Path(testPath);
104+
Map<String, String> destMap = new HashMap<>();
105+
destMap.put("ns0", testPath);
106+
destMap.put("ns1", testPath);
107+
nnFs0.mkdirs(path);
108+
nnFs1.mkdirs(path);
109+
MountTable addEntry =
110+
MountTable.newInstance(testPath, destMap);
111+
addEntry.setQuota(new RouterQuotaUsage.Builder().build());
112+
addEntry.setDestOrder(DestinationOrder.LOCAL);
113+
assertTrue(addMountTable(addEntry));
114+
115+
Map<String, String> datanodesSubcluster = null;
116+
try {
117+
MultipleDestinationMountTableResolver resolver =
118+
(MultipleDestinationMountTableResolver) routerContext.getRouter().getSubclusterResolver();
119+
LocalResolver localResolver =
120+
(LocalResolver) resolver.getOrderedResolver(DestinationOrder.LOCAL);
121+
datanodesSubcluster = localResolver.getDatanodesSubcluster();
122+
} catch (Exception e) {
123+
LOG.info("Exception occurs when testLocalResolverGetDatanodesSubcluster.", e);
124+
} finally {
125+
RouterClient client = routerContext.getAdminClient();
126+
MountTableManager mountTableManager = client.getMountTableManager();
127+
RemoveMountTableEntryRequest req2 =
128+
RemoveMountTableEntryRequest.newInstance(testPath);
129+
mountTableManager.removeMountTableEntry(req2);
130+
nnFs0.delete(new Path(testPath), true);
131+
nnFs1.delete(new Path(testPath), true);
132+
}
133+
assertNotNull(datanodesSubcluster);
134+
assertFalse(datanodesSubcluster.isEmpty());
135+
}
136+
87137
@Override
88138
@Test
89139
public void testInvokeAtAvailableNs() throws IOException {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.netty.handler.ssl.SslContextBuilder;
4545
import io.netty.handler.ssl.SslHandler;
4646
import io.netty.handler.stream.ChunkedWriteHandler;
47+
import io.netty.util.ReferenceCounted;
4748
import io.netty.util.concurrent.GlobalEventExecutor;
4849

4950
import java.io.ByteArrayOutputStream;
@@ -61,6 +62,7 @@
6162
import java.util.List;
6263
import java.util.Objects;
6364
import java.util.concurrent.ConcurrentHashMap;
65+
import java.util.concurrent.ExecutionException;
6466
import java.util.concurrent.TimeUnit;
6567

6668
import javax.crypto.SecretKey;
@@ -115,6 +117,7 @@ public void testGetMapsChunkedFileSSl() throws Exception {
115117
final LinkedList<Object> unencryptedMessages = new LinkedList<>();
116118
final EmbeddedChannel shuffle = t.createShuffleHandlerSSL(unencryptedMessages);
117119
t.testGetAllAttemptsForReduce0NoKeepAlive(unencryptedMessages, shuffle);
120+
drainChannel(shuffle);
118121
}
119122

120123
@Test
@@ -192,8 +195,10 @@ public void testIncompatibleShuffleVersion() {
192195

193196
assertEquals(getExpectedHttpResponse(HttpResponseStatus.BAD_REQUEST).toString(),
194197
actual.toString());
198+
tryRelease(actual);
195199

196200
assertFalse(shuffle.isActive(), "closed"); // known-issue
201+
drainChannel(decoder);
197202
}
198203

199204
@Test
@@ -210,11 +215,13 @@ public void testInvalidMapNoIndexFile() {
210215
}
211216

212217
DefaultHttpResponse actual = decoder.readInbound();
218+
drainChannel(decoder);
213219
assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
214220
actual.headers().set(CONTENT_LENGTH, 0);
215221

216222
assertEquals(getExpectedHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR).toString(),
217223
actual.toString());
224+
tryRelease(actual);
218225

219226
assertFalse(shuffle.isActive(), "closed");
220227
}
@@ -237,15 +244,36 @@ public void testInvalidMapNoDataFile() {
237244
}
238245

239246
DefaultHttpResponse actual = decoder.readInbound();
247+
drainChannel(decoder);
240248
assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
241249
actual.headers().set(CONTENT_LENGTH, 0);
242250

243251
assertEquals(getExpectedHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR).toString(),
244252
actual.toString());
253+
tryRelease(actual);
245254

246255
assertFalse(shuffle.isActive(), "closed");
247256
}
248257

258+
private void drainChannel(EmbeddedChannel ch) {
259+
Object o;
260+
while((o = ch.readInbound())!=null) {
261+
tryRelease(o);
262+
}
263+
while((o = ch.readOutbound())!=null) {
264+
tryRelease(o);
265+
}
266+
}
267+
268+
private void tryRelease(Object obj) {
269+
if (obj instanceof ReferenceCounted) {
270+
ReferenceCounted bb = (ReferenceCounted) obj;
271+
if (bb.refCnt() > 0) {
272+
bb.release(bb.refCnt());
273+
}
274+
}
275+
}
276+
249277
private DefaultHttpResponse getExpectedHttpResponse(HttpResponseStatus status) {
250278
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
251279
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
@@ -365,8 +393,8 @@ private void testGetAllAttemptsForReduce0NoKeepAlive(
365393
assertFalse(shuffle.isActive(), "no keep-alive");
366394
}
367395

368-
private void testKeepAlive(java.util.Queue<Object> messages,
369-
EmbeddedChannel shuffle) throws IOException {
396+
private void testKeepAlive(java.util.Queue<Object> messages, EmbeddedChannel shuffle)
397+
throws IOException, InterruptedException, ExecutionException {
370398
final FullHttpRequest req1 = createRequest(
371399
getUri(TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true));
372400
shuffle.writeInbound(req1);
@@ -375,6 +403,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
375403
getAttemptData(new Attempt(TEST_ATTEMPT_1, TEST_DATA_A))
376404
);
377405
assertTrue(shuffle.isActive(), "keep-alive");
406+
drainChannel(shuffle);
378407
messages.clear();
379408

380409
final FullHttpRequest req2 = createRequest(
@@ -385,6 +414,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
385414
getAttemptData(new Attempt(TEST_ATTEMPT_2, TEST_DATA_B))
386415
);
387416
assertTrue(shuffle.isActive(), "keep-alive");
417+
drainChannel(shuffle);
388418
messages.clear();
389419

390420
final FullHttpRequest req3 = createRequest(
@@ -395,6 +425,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
395425
getAttemptData(new Attempt(TEST_ATTEMPT_3, TEST_DATA_C))
396426
);
397427
assertFalse(shuffle.isActive(), "no keep-alive");
428+
drainChannel(shuffle);
398429
}
399430

400431
private ArrayList<ByteBuf> getAllAttemptsForReduce0() throws IOException {
@@ -431,11 +462,13 @@ private void assertResponse(java.util.Queue<Object> outboundMessages,
431462
decodeChannel.writeInbound(actualBytes);
432463
Object obj = decodeChannel.readInbound();
433464
LOG.info("Decoded object: {}", obj);
465+
drainChannel(decodeChannel);
434466

435467
if (i == 0) {
436468
DefaultHttpResponse resp = (DefaultHttpResponse) obj;
437469
assertEquals(response.toString(), resp.toString());
438470
}
471+
tryRelease(obj);
439472
if (i > 0 && i <= content.size()) {
440473
assertEquals(ByteBufUtil.prettyHexDump(content.get(i - 1)),
441474
actualHexdump, "data should match");

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ public void setup() throws IOException {
8484

8585
@AfterEach
8686
public void teardown() {
87+
//Trigger GC so that we get the leak warnings early
88+
System.gc();
89+
try {
90+
// Wait for logger to flush
91+
Thread.sleep(1000);
92+
} catch (InterruptedException e) {
93+
}
8794
System.setOut(standardOut);
8895
System.out.print(outputStreamCaptor);
8996
// For this to work ch.qos.logback.classic is needed for some reason

0 commit comments

Comments
 (0)