diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 4b656759cc0..a3a6b30593c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -1311,6 +1313,7 @@ public class RMNodeImpl implements RMNode, EventHandler { new ArrayList(); List completedContainers = new ArrayList(); + int numRemoteRunningContainers = 0; for (ContainerStatus remoteContainer : containerStatuses) { ContainerId containerId = remoteContainer.getContainerId(); @@ -1344,6 +1347,7 @@ public class RMNodeImpl implements RMNode, EventHandler { if (remoteContainer.getState() == ContainerState.RUNNING) { // Process only GUARANTEED containers in the RM. if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) { + ++numRemoteRunningContainers; if (!launchedContainers.contains(containerId)) { // Just launched container. RM knows about it the first time. launchedContainers.add(containerId); @@ -1366,12 +1370,45 @@ public class RMNodeImpl implements RMNode, EventHandler { completedContainers.add(remoteContainer); } } + completedContainers.addAll(findLostContainers( + numRemoteRunningContainers, containerStatuses)); + if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers, completedContainers)); } } + private List findLostContainers(int numRemoteRunning, + List containerStatuses) { + if (numRemoteRunning >= launchedContainers.size()) { + return Collections.emptyList(); + } + Set nodeContainers = + new HashSet(numRemoteRunning); + List lostContainers = new ArrayList( + launchedContainers.size() - numRemoteRunning); + for (ContainerStatus remoteContainer : containerStatuses) { + if (remoteContainer.getState() == ContainerState.RUNNING + && remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) { + nodeContainers.add(remoteContainer.getContainerId()); + } + } + Iterator iter = launchedContainers.iterator(); + while (iter.hasNext()) { + ContainerId containerId = iter.next(); + if (!nodeContainers.contains(containerId)) { + String diag = "Container " + containerId + + " was running but not reported from " + nodeId; + LOG.warn(diag); + lostContainers.add(SchedulerUtils.createAbnormalContainerStatus( + containerId, diag)); + iter.remove(); + } + } + return lostContainers; + } + private void handleLogAggregationStatus( List logAggregationReportsForApps) { for (LogAggregationReport report : logAggregationReportsForApps) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 04ea51c99ff..2e2bef7fbc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -57,6 +58,8 @@ public class MockNM { private MasterKey currentContainerTokenMasterKey; private MasterKey currentNMTokenMasterKey; private String version; + private Map containerStats = + new HashMap(); public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -106,14 +109,12 @@ public class MockNM { } public void containerIncreaseStatus(Container container) throws Exception { - Map> conts = new HashMap<>(); ContainerStatus containerStatus = BuilderUtils.newContainerStatus( container.getId(), ContainerState.RUNNING, "Success", 0, container.getResource()); - conts.put(container.getId().getApplicationAttemptId().getApplicationId(), - Collections.singletonList(containerStatus)); List increasedConts = Collections.singletonList(container); - nodeHeartbeat(conts, increasedConts, true, ++responseId); + nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, + true, ++responseId); } public RegisterNodeManagerResponse registerNode() throws Exception { @@ -147,18 +148,27 @@ public class MockNM { memory = (int) newResource.getMemorySize(); vCores = newResource.getVirtualCores(); } + containerStats.clear(); + if (containerReports != null) { + for (NMContainerStatus report : containerReports) { + if (report.getContainerState() != ContainerState.COMPLETE) { + containerStats.put(report.getContainerId(), + ContainerStatus.newInstance(report.getContainerId(), + report.getContainerState(), report.getDiagnostics(), + report.getContainerExitStatus())); + } + } + } return registrationResponse; } public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { - return nodeHeartbeat(new HashMap>(), - isHealthy, ++responseId); + return nodeHeartbeat(Collections.emptyList(), + Collections.emptyList(), isHealthy, ++responseId); } public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, long containerId, ContainerState containerState) throws Exception { - HashMap> nodeUpdate = - new HashMap>(1); ContainerStatus containerStatus = BuilderUtils.newContainerStatus( BuilderUtils.newContainerId(attemptId, containerId), containerState, "Success", 0, BuilderUtils.newResource(memory, vCores)); @@ -166,8 +176,8 @@ public class MockNM { new ArrayList(1); containerStatusList.add(containerStatus); Log.info("ContainerStatus: " + containerStatus); - nodeUpdate.put(attemptId.getApplicationId(), containerStatusList); - return nodeHeartbeat(nodeUpdate, true); + return nodeHeartbeat(containerStatusList, + Collections.emptyList(), true, ++responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy, int resId) throws Exception { - return nodeHeartbeat(conts, new ArrayList(), isHealthy, resId); + ArrayList updatedStats = new ArrayList(); + for (List stats : conts.values()) { + updatedStats.addAll(stats); + } + return nodeHeartbeat(updatedStats, Collections.emptyList(), + isHealthy, resId); } - public NodeHeartbeatResponse nodeHeartbeat(Map> conts, List increasedConts, - boolean isHealthy, int resId) throws Exception { + public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, + List increasedConts, boolean isHealthy, int resId) + throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setResponseId(resId); status.setNodeId(nodeId); - for (Map.Entry> entry : conts.entrySet()) { - Log.info("entry.getValue() " + entry.getValue()); - status.setContainersStatuses(entry.getValue()); + ArrayList completedContainers = new ArrayList(); + for (ContainerStatus stat : updatedStats) { + if (stat.getState() == ContainerState.COMPLETE) { + completedContainers.add(stat.getContainerId()); + } + containerStats.put(stat.getContainerId(), stat); + } + status.setContainersStatuses( + new ArrayList(containerStats.values())); + for (ContainerId cid : completedContainers) { + containerStats.remove(cid); } status.setIncreasedContainers(increasedConts); NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 16fe99890d1..83a7c73cfbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -34,6 +34,7 @@ import java.util.Random; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -1021,4 +1022,47 @@ public class TestRMNodeTransitions { Resource originalCapacity = node.getOriginalTotalCapability(); assertEquals("Original total capability not null after recommission", null, originalCapacity); } + + @Test + public void testDisappearingContainer() { + ContainerId cid1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(1, 1), 1), 1); + ContainerId cid2 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(2, 2), 2), 2); + ArrayList containerStats = + new ArrayList(); + containerStats.add(ContainerStatus.newInstance(cid1, + ContainerState.RUNNING, "", -1)); + containerStats.add(ContainerStatus.newInstance(cid2, + ContainerState.RUNNING, "", -1)); + node = getRunningNode(); + node.handle(getMockRMNodeStatusEvent(containerStats)); + assertEquals("unexpected number of running containers", + 2, node.getLaunchedContainers().size()); + Assert.assertTrue("first container not running", + node.getLaunchedContainers().contains(cid1)); + Assert.assertTrue("second container not running", + node.getLaunchedContainers().contains(cid2)); + assertEquals("already completed containers", + 0, completedContainers.size()); + containerStats.remove(0); + node.handle(getMockRMNodeStatusEvent(containerStats)); + assertEquals("expected one container to be completed", + 1, completedContainers.size()); + ContainerStatus cs = completedContainers.get(0); + assertEquals("first container not the one that completed", + cid1, cs.getContainerId()); + assertEquals("completed container not marked complete", + ContainerState.COMPLETE, cs.getState()); + assertEquals("completed container not marked aborted", + ContainerExitStatus.ABORTED, cs.getExitStatus()); + Assert.assertTrue("completed container not marked missing", + cs.getDiagnostics().contains("not reported")); + assertEquals("unexpected number of running containers", + 1, node.getLaunchedContainers().size()); + Assert.assertTrue("second container not running", + node.getLaunchedContainers().contains(cid2)); + } }