YARN-4227. Ignore expired containers from removed nodes in FairScheduler. (Wilfred Spiegelenburg via rchiang)
This commit is contained in:
parent
73ff09b79a
commit
59ab5da0a0
|
@ -669,27 +669,36 @@ public class FairScheduler extends
|
|||
ApplicationId appId =
|
||||
container.getId().getApplicationAttemptId().getApplicationId();
|
||||
if (application == null) {
|
||||
LOG.info(
|
||||
"Container " + container + " of" + " finished application " + appId
|
||||
+ " completed with event " + event);
|
||||
LOG.info("Container " + container + " of finished application " +
|
||||
appId + " completed with event " + event);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the node on which the container was allocated
|
||||
FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
|
||||
|
||||
NodeId nodeID = container.getNodeId();
|
||||
FSSchedulerNode node = getFSSchedulerNode(nodeID);
|
||||
// node could be null if the thread was waiting for the lock and the node
|
||||
// was removed in another thread
|
||||
if (rmContainer.getState() == RMContainerState.RESERVED) {
|
||||
application.unreserve(rmContainer.getReservedSchedulerKey(), node);
|
||||
} else{
|
||||
if (node != null) {
|
||||
application.unreserve(rmContainer.getReservedSchedulerKey(), node);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping unreserve on removed node: " + nodeID);
|
||||
}
|
||||
} else {
|
||||
application.containerCompleted(rmContainer, containerStatus, event);
|
||||
node.releaseContainer(rmContainer.getContainerId(), false);
|
||||
if (node != null) {
|
||||
node.releaseContainer(rmContainer.getContainerId(), false);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping container release on removed node: " + nodeID);
|
||||
}
|
||||
updateRootQueueMetrics();
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Application attempt " + application.getApplicationAttemptId()
|
||||
+ " released container " + container.getId() + " on node: " + node
|
||||
+ " with event: " + event);
|
||||
+ " released container " + container.getId() + " on node: " +
|
||||
(node == null ? nodeID : node) + " with event: " + event);
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -5355,4 +5356,62 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
assertEquals("Unexpected state dump string",
|
||||
parentQueueString + ", " + childQueueString, parent.dumpState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompletedContainerOnRemovedNode() throws IOException {
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Add a node
|
||||
RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(2048), 2,
|
||||
"127.0.0.2");
|
||||
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
||||
|
||||
// Create application attempt
|
||||
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
|
||||
createMockRMApp(appAttemptId);
|
||||
scheduler.addApplication(appAttemptId.getApplicationId(), "root.queue1",
|
||||
"user1", false);
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
||||
|
||||
// Create container request that goes to a specific node.
|
||||
// Without the 2nd and 3rd request we do not get live containers
|
||||
List<ResourceRequest> ask1 = new ArrayList<>();
|
||||
ResourceRequest request1 =
|
||||
createResourceRequest(1024, node.getHostName(), 1, 1, true);
|
||||
ask1.add(request1);
|
||||
ResourceRequest request2 =
|
||||
createResourceRequest(1024, node.getRackName(), 1, 1, false);
|
||||
ask1.add(request2);
|
||||
ResourceRequest request3 =
|
||||
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, false);
|
||||
ask1.add(request3);
|
||||
|
||||
// Perform allocation
|
||||
scheduler.allocate(appAttemptId, ask1, new ArrayList<ContainerId>(), null,
|
||||
null, NULL_UPDATE_REQUESTS);
|
||||
scheduler.update();
|
||||
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
||||
|
||||
// Get the allocated containers for the application (list can not be null)
|
||||
Collection<RMContainer> clist = scheduler.getSchedulerApp(appAttemptId)
|
||||
.getLiveContainers();
|
||||
Assert.assertEquals(1, clist.size());
|
||||
|
||||
// Make sure that we remove the correct node (should never fail)
|
||||
RMContainer rmc = clist.iterator().next();
|
||||
NodeId containerNodeID = rmc.getAllocatedNode();
|
||||
assertEquals(node.getNodeID(), containerNodeID);
|
||||
|
||||
// Remove node
|
||||
scheduler.handle(new NodeRemovedSchedulerEvent(node));
|
||||
|
||||
// Call completedContainer() should not fail even if the node has been
|
||||
// removed
|
||||
scheduler.completedContainer(rmc,
|
||||
SchedulerUtils.createAbnormalContainerStatus(rmc.getContainerId(),
|
||||
SchedulerUtils.COMPLETED_APPLICATION),
|
||||
RMContainerEventType.EXPIRE);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue