YARN-4227. Ignore expired containers from removed nodes in FairScheduler. (Wilfred Spiegelenburg via rchiang)

(cherry picked from commit bc2d67d6c10619716ef7acce263f3269a86c3150)
This commit is contained in:
Ray Chiang 2018-01-08 15:32:25 -08:00
parent eadd3cecf9
commit 18c3982851
2 changed files with 78 additions and 10 deletions

View File

@ -693,27 +693,36 @@ public class FairScheduler extends
ApplicationId appId = ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId(); container.getId().getApplicationAttemptId().getApplicationId();
if (application == null) { if (application == null) {
LOG.info( LOG.info("Container " + container + " of finished application " +
"Container " + container + " of" + " finished application " + appId appId + " completed with event " + event);
+ " completed with event " + event);
return; return;
} }
// Get the node on which the container was allocated // Get the node on which the container was allocated
FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); NodeId nodeID = container.getNodeId();
FSSchedulerNode node = getFSSchedulerNode(nodeID);
// node could be null if the thread was waiting for the lock and the node
// was removed in another thread
if (rmContainer.getState() == RMContainerState.RESERVED) { if (rmContainer.getState() == RMContainerState.RESERVED) {
application.unreserve(rmContainer.getReservedSchedulerKey(), node); if (node != null) {
} else{ application.unreserve(rmContainer.getReservedSchedulerKey(), node);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Skipping unreserve on removed node: " + nodeID);
}
} else {
application.containerCompleted(rmContainer, containerStatus, event); application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(rmContainer.getContainerId(), false); if (node != null) {
node.releaseContainer(rmContainer.getContainerId(), false);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Skipping container release on removed node: " + nodeID);
}
updateRootQueueMetrics(); updateRootQueueMetrics();
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Application attempt " + application.getApplicationAttemptId() LOG.debug("Application attempt " + application.getApplicationAttemptId()
+ " released container " + container.getId() + " on node: " + node + " released container " + container.getId() + " on node: " +
+ " with event: " + event); (node == null ? nodeID : node) + " with event: " + event);
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -5353,4 +5354,62 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertTrue(parent.dumpState().equals( assertTrue(parent.dumpState().equals(
parentQueueString + ", " + childQueueString)); parentQueueString + ", " + childQueueString));
} }
@Test
public void testCompletedContainerOnRemovedNode() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(2048), 2,
"127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node));
// Create application attempt
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
createMockRMApp(appAttemptId);
scheduler.addApplication(appAttemptId.getApplicationId(), "root.queue1",
"user1", false);
scheduler.addApplicationAttempt(appAttemptId, false, false);
// Create container request that goes to a specific node.
// Without the 2nd and 3rd request we do not get live containers
List<ResourceRequest> ask1 = new ArrayList<>();
ResourceRequest request1 =
createResourceRequest(1024, node.getHostName(), 1, 1, true);
ask1.add(request1);
ResourceRequest request2 =
createResourceRequest(1024, node.getRackName(), 1, 1, false);
ask1.add(request2);
ResourceRequest request3 =
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, false);
ask1.add(request3);
// Perform allocation
scheduler.allocate(appAttemptId, ask1, new ArrayList<ContainerId>(), null,
null, NULL_UPDATE_REQUESTS);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node));
// Get the allocated containers for the application (list can not be null)
Collection<RMContainer> clist = scheduler.getSchedulerApp(appAttemptId)
.getLiveContainers();
Assert.assertEquals(1, clist.size());
// Make sure that we remove the correct node (should never fail)
RMContainer rmc = clist.iterator().next();
NodeId containerNodeID = rmc.getAllocatedNode();
assertEquals(node.getNodeID(), containerNodeID);
// Remove node
scheduler.handle(new NodeRemovedSchedulerEvent(node));
// Call completedContainer() should not fail even if the node has been
// removed
scheduler.completedContainer(rmc,
SchedulerUtils.createAbnormalContainerStatus(rmc.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.EXPIRE);
}
} }