YARN-5195. RM intermittently crashed with NPE while handling APP_ATTEMPT_REMOVED event when async-scheduling enabled in CapacityScheduler. (sandflee via wangda)
(cherry picked from commit d62e121ffc
)
This commit is contained in:
parent
55fa19d576
commit
77e0b6d1bf
|
@ -1209,11 +1209,18 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
||||||
if (rmContext.isWorkPreservingRecoveryEnabled()
|
if (rmContext.isWorkPreservingRecoveryEnabled()
|
||||||
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!nodeTracker.exists(node.getNodeID())) {
|
||||||
|
LOG.info("Skipping scheduling as the node " + node.getNodeID() +
|
||||||
|
" has been removed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// reset allocation and reservation stats before we start doing any work
|
// reset allocation and reservation stats before we start doing any work
|
||||||
updateSchedulerHealth(lastNodeUpdateTime, node,
|
updateSchedulerHealth(lastNodeUpdateTime, node,
|
||||||
new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
|
new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
|
||||||
|
|
|
@ -3375,4 +3375,44 @@ public class TestCapacityScheduler {
|
||||||
Assert.assertEquals(availableResource.getMemorySize(), 0);
|
Assert.assertEquals(availableResource.getMemorySize(), 0);
|
||||||
Assert.assertEquals(availableResource.getVirtualCores(), 0);
|
Assert.assertEquals(availableResource.getVirtualCores(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulingOnRemovedNode() throws Exception {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setBoolean(
|
||||||
|
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
|
||||||
|
false);
|
||||||
|
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
RMApp app = rm.submitApp(100);
|
||||||
|
rm.drainEvents();
|
||||||
|
|
||||||
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10240, 10);
|
||||||
|
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
|
||||||
|
|
||||||
|
//remove nm2 to keep am alive
|
||||||
|
MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10240, 10);
|
||||||
|
|
||||||
|
am.allocate(ResourceRequest.ANY, 2048, 1, null);
|
||||||
|
|
||||||
|
CapacityScheduler scheduler =
|
||||||
|
(CapacityScheduler) rm.getRMContext().getScheduler();
|
||||||
|
FiCaSchedulerNode node =
|
||||||
|
(FiCaSchedulerNode)
|
||||||
|
scheduler.getNodeTracker().getNode(nm2.getNodeId());
|
||||||
|
scheduler.handle(new NodeRemovedSchedulerEvent(
|
||||||
|
rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
|
||||||
|
// schedulerNode is removed, try allocate a container
|
||||||
|
scheduler.allocateContainersToNode(node);
|
||||||
|
|
||||||
|
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
||||||
|
new AppAttemptRemovedSchedulerEvent(
|
||||||
|
am.getApplicationAttemptId(),
|
||||||
|
RMAppAttemptState.FINISHED, false);
|
||||||
|
scheduler.handle(appRemovedEvent1);
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue