Skip to content

Commit edf8db8

Browse files
committed
[YARN-10058] Handle uncaught exception for async-scheduling threads to prevent scheduler hangs.
1 parent 6589d9f commit edf8db8

File tree

4 files changed

+108
-1
lines changed

4 files changed

+108
-1
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.atomic.AtomicBoolean;
3737

3838
import org.apache.commons.lang3.StringUtils;
39+
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
3940
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
4041
import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
4142
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
@@ -3543,7 +3544,10 @@ static class AsyncSchedulingConfiguration {
35433544

35443545
this.asyncSchedulerThreads = new ArrayList<>();
35453546
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
3546-
asyncSchedulerThreads.add(new AsyncScheduleThread(cs));
3547+
AsyncScheduleThread ast = new AsyncScheduleThread(cs);
3548+
ast.setUncaughtExceptionHandler(
3549+
new RMCriticalThreadUncaughtExceptionHandler(cs.rmContext));
3550+
asyncSchedulerThreads.add(ast);
35473551
}
35483552
this.resourceCommitterService = new ResourceCommitterService(cs);
35493553
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
3030
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
3131
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling;
32+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
3233
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
3334
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
3435
import org.junit.Assert;
@@ -135,6 +136,52 @@ public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
135136
rm2.stop();
136137
}
137138

139+
@Test(timeout = 30000)
140+
public void testAsyncScheduleThreadExit() throws Exception {
141+
// start two RMs, and transit rm1 to active, rm2 to standby
142+
startRMs();
143+
// register NM
144+
rm1.registerNode("192.1.1.1:1234", 8192, 8);
145+
rm1.drainEvents();
146+
147+
// test async-scheduling thread exit
148+
try{
149+
// set resource calculator to be null to simulate
150+
// NPE in async-scheduling thread
151+
CapacityScheduler cs =
152+
(CapacityScheduler) rm1.getRMContext().getScheduler();
153+
cs.setResourceCalculator(null);
154+
checkAsyncSchedulerThreads(Thread.currentThread());
155+
156+
// wait for RM to be shutdown until timeout
157+
boolean done = TestUtils.waitForUntilTimeout(
158+
() -> rm1.getRMContext().getHAServiceState()
159+
== HAServiceProtocol.HAServiceState.STANDBY, 100, 5000);
160+
Assert.assertTrue(
161+
"RM1 should be transitioned to standby, but got state: "
162+
+ rm1.getRMContext().getHAServiceState(), done);
163+
164+
// failover RM2 to RM1
165+
HAServiceProtocol.StateChangeRequestInfo requestInfo =
166+
new HAServiceProtocol.StateChangeRequestInfo(
167+
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
168+
rm2.adminService.transitionToStandby(requestInfo);
169+
rm1.adminService.transitionToActive(requestInfo);
170+
done = TestUtils.waitForUntilTimeout(
171+
() -> rm1.getRMContext().getHAServiceState()
172+
== HAServiceProtocol.HAServiceState.ACTIVE, 100, 5000);
173+
Assert.assertTrue(
174+
"RM1 should be transitioned to active, but got state: "
175+
+ rm1.getRMContext().getHAServiceState(), done);
176+
177+
// make sure async-scheduling thread is correct after failover
178+
checkAsyncSchedulerThreads(Thread.currentThread());
179+
} finally {
180+
rm1.stop();
181+
rm2.stop();
182+
}
183+
}
184+
138185
private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception {
139186
MockRMAppSubmissionData data =
140187
MockRMAppSubmissionData.Builder.createWithMemory(200, rm)

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.junit.Assert;
6868
import org.junit.Before;
6969
import org.junit.Test;
70+
import org.junit.contrib.java.lang.system.internal.NoExitSecurityManager;
7071
import org.mockito.Mockito;
7172
import org.mockito.invocation.InvocationOnMock;
7273
import org.mockito.stubbing.Answer;
@@ -1072,6 +1073,38 @@ public Boolean answer(InvocationOnMock invocation) throws Exception {
10721073
rm.stop();
10731074
}
10741075

1076+
@Test(timeout = 30000)
1077+
public void testAsyncScheduleThreadExit() throws Exception {
1078+
// init RM & NM
1079+
final MockRM rm = new MockRM(conf);
1080+
rm.start();
1081+
rm.registerNode("192.168.0.1:1234", 8 * GB);
1082+
rm.drainEvents();
1083+
1084+
// Set no exit security manager to catch System.exit
1085+
SecurityManager originalSecurityManager = System.getSecurityManager();
1086+
NoExitSecurityManager noExitSecurityManager =
1087+
new NoExitSecurityManager(originalSecurityManager);
1088+
System.setSecurityManager(noExitSecurityManager);
1089+
1090+
// test async-scheduling thread exit
1091+
try{
1092+
// set resource calculator to be null to simulate
1093+
// NPE in async-scheduling thread
1094+
CapacityScheduler cs =
1095+
(CapacityScheduler) rm.getRMContext().getScheduler();
1096+
cs.setResourceCalculator(null);
1097+
1098+
// wait for RM to be shutdown until timeout
1099+
boolean done = TestUtils.waitForUntilTimeout(
1100+
noExitSecurityManager::isCheckExitCalled, 100, 5000);
1101+
Assert.assertTrue("RM should be shut down, but nothing happened", done);
1102+
} finally {
1103+
System.setSecurityManager(originalSecurityManager);
1104+
rm.stop();
1105+
}
1106+
}
1107+
10751108
private ResourceCommitRequest createAllocateFromReservedProposal(
10761109
int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
10771110
SchedulerNode allocateNode, SchedulerNode reservedNode,

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.io.IOException;
5555
import java.util.Map;
5656
import java.util.Set;
57+
import java.util.function.Supplier;
5758

5859
import static org.mockito.ArgumentMatchers.any;
5960
import static org.mockito.Mockito.doReturn;
@@ -486,4 +487,26 @@ public FiCaSchedulerApp getApplicationAttempt(
486487
cs.submitResourceCommitRequest(clusterResource,
487488
csAssignment);
488489
}
490+
491+
/**
492+
* Wait until the condition is met or timeout.
493+
* @param condition condition to check
494+
* @param intervalMs interval to check the condition
495+
* @param timeoutMs timeout
496+
* @return true if the condition is met before timeout, false otherwise
497+
* @throws InterruptedException
498+
*/
499+
public static boolean waitForUntilTimeout(Supplier<Boolean> condition,
500+
long intervalMs, long timeoutMs) throws InterruptedException {
501+
long startTime = System.currentTimeMillis();
502+
while (!condition.get()) {
503+
long elapsedTime = System.currentTimeMillis() - startTime;
504+
if (elapsedTime > timeoutMs) {
505+
return false;
506+
}
507+
long remainingTime = timeoutMs - elapsedTime;
508+
Thread.sleep(Math.min(intervalMs, remainingTime > 0 ? remainingTime : 0));
509+
}
510+
return true;
511+
}
489512
}

0 commit comments

Comments
 (0)