YARN-5197. RM leaks containers if running container disappears from node update. Contributed by Jason Lowe.
This commit is contained in:
parent
392e7e2d06
commit
ce2fcd932d
@ -36,6 +36,9 @@ Release 2.6.5 - UNRELEASED
|
||||
YARN-5206. RegistrySecurity includes id:pass in exception text if
|
||||
considered invalid (Steve Loughran via jlowe)
|
||||
|
||||
YARN-5197. RM leaks containers if running container disappears from
|
||||
node update. Contributed by Jason Lowe.
|
||||
|
||||
Release 2.6.4 - 2016-02-11
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -19,8 +19,10 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
@ -57,6 +59,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
||||
@ -866,6 +869,7 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
|
||||
new ArrayList<ContainerStatus>();
|
||||
List<ContainerStatus> completedContainers =
|
||||
new ArrayList<ContainerStatus>();
|
||||
int numRemoteRunningContainers = 0;
|
||||
for (ContainerStatus remoteContainer : containerStatuses) {
|
||||
ContainerId containerId = remoteContainer.getContainerId();
|
||||
|
||||
@ -887,6 +891,7 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
|
||||
|
||||
// Process running containers
|
||||
if (remoteContainer.getState() == ContainerState.RUNNING) {
|
||||
++numRemoteRunningContainers;
|
||||
if (!launchedContainers.contains(containerId)) {
|
||||
// Just launched container. RM knows about it the first time.
|
||||
launchedContainers.add(containerId);
|
||||
@ -898,10 +903,41 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
|
||||
completedContainers.add(remoteContainer);
|
||||
}
|
||||
}
|
||||
completedContainers.addAll(findLostContainers(
|
||||
numRemoteRunningContainers, containerStatuses));
|
||||
|
||||
if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
|
||||
nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
|
||||
completedContainers));
|
||||
}
|
||||
}
|
||||
|
||||
private List<ContainerStatus> findLostContainers(int numRemoteRunning,
|
||||
List<ContainerStatus> containerStatuses) {
|
||||
if (numRemoteRunning >= launchedContainers.size()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
Set<ContainerId> nodeContainers =
|
||||
new HashSet<ContainerId>(numRemoteRunning);
|
||||
List<ContainerStatus> lostContainers = new ArrayList<ContainerStatus>(
|
||||
launchedContainers.size() - numRemoteRunning);
|
||||
for (ContainerStatus remoteContainer : containerStatuses) {
|
||||
if (remoteContainer.getState() == ContainerState.RUNNING) {
|
||||
nodeContainers.add(remoteContainer.getContainerId());
|
||||
}
|
||||
}
|
||||
Iterator<ContainerId> iter = launchedContainers.iterator();
|
||||
while (iter.hasNext()) {
|
||||
ContainerId containerId = iter.next();
|
||||
if (!nodeContainers.contains(containerId)) {
|
||||
String diag = "Container " + containerId
|
||||
+ " was running but not reported from " + nodeId;
|
||||
LOG.warn(diag);
|
||||
lostContainers.add(SchedulerUtils.createAbnormalContainerStatus(
|
||||
containerId, diag));
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
return lostContainers;
|
||||
}
|
||||
}
|
||||
|
@ -20,12 +20,14 @@
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
@ -55,6 +57,8 @@ public class MockNM {
|
||||
private MasterKey currentContainerTokenMasterKey;
|
||||
private MasterKey currentNMTokenMasterKey;
|
||||
private String version;
|
||||
private Map<ContainerId, ContainerStatus> containerStats =
|
||||
new HashMap<ContainerId, ContainerStatus>();
|
||||
|
||||
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
|
||||
// scale vcores based on the requested memory
|
||||
@ -129,18 +133,27 @@ public RegisterNodeManagerResponse registerNode(
|
||||
this.currentContainerTokenMasterKey =
|
||||
registrationResponse.getContainerTokenMasterKey();
|
||||
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
|
||||
containerStats.clear();
|
||||
if (containerReports != null) {
|
||||
for (NMContainerStatus report : containerReports) {
|
||||
if (report.getContainerState() != ContainerState.COMPLETE) {
|
||||
containerStats.put(report.getContainerId(),
|
||||
ContainerStatus.newInstance(report.getContainerId(),
|
||||
report.getContainerState(), report.getDiagnostics(),
|
||||
report.getContainerExitStatus()));
|
||||
}
|
||||
}
|
||||
}
|
||||
return registrationResponse;
|
||||
}
|
||||
|
||||
public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
|
||||
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
|
||||
return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
|
||||
isHealthy, ++responseId);
|
||||
}
|
||||
|
||||
public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
|
||||
long containerId, ContainerState containerState) throws Exception {
|
||||
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>(1);
|
||||
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
|
||||
BuilderUtils.newContainerId(attemptId, containerId), containerState,
|
||||
"Success", 0);
|
||||
@ -148,8 +161,7 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
|
||||
new ArrayList<ContainerStatus>(1);
|
||||
containerStatusList.add(containerStatus);
|
||||
Log.info("ContainerStatus: " + containerStatus);
|
||||
nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
|
||||
return nodeHeartbeat(nodeUpdate, true);
|
||||
return nodeHeartbeat(containerStatusList, true, ++responseId);
|
||||
}
|
||||
|
||||
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
@ -159,13 +171,30 @@ public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
|
||||
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
|
||||
ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
|
||||
for (List<ContainerStatus> stats : conts.values()) {
|
||||
updatedStats.addAll(stats);
|
||||
}
|
||||
return nodeHeartbeat(updatedStats, isHealthy, resId);
|
||||
}
|
||||
|
||||
public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
|
||||
boolean isHealthy, int resId) throws Exception {
|
||||
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
|
||||
NodeStatus status = Records.newRecord(NodeStatus.class);
|
||||
status.setResponseId(resId);
|
||||
status.setNodeId(nodeId);
|
||||
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
|
||||
Log.info("entry.getValue() " + entry.getValue());
|
||||
status.setContainersStatuses(entry.getValue());
|
||||
ArrayList<ContainerId> completedContainers = new ArrayList<ContainerId>();
|
||||
for (ContainerStatus stat : updatedStats) {
|
||||
if (stat.getState() == ContainerState.COMPLETE) {
|
||||
completedContainers.add(stat.getContainerId());
|
||||
}
|
||||
containerStats.put(stat.getContainerId(), stat);
|
||||
}
|
||||
status.setContainersStatuses(
|
||||
new ArrayList<ContainerStatus>(containerStats.values()));
|
||||
for (ContainerId cid : completedContainers) {
|
||||
containerStats.remove(cid);
|
||||
}
|
||||
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
|
||||
healthStatus.setHealthReport("");
|
||||
|
@ -32,7 +32,9 @@
|
||||
|
||||
import org.apache.hadoop.util.HostsFileReader;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
@ -147,6 +149,11 @@ public void tearDown() throws Exception {
|
||||
}
|
||||
|
||||
private RMNodeStatusEvent getMockRMNodeStatusEvent() {
|
||||
return getMockRMNodeStatusEvent(null);
|
||||
}
|
||||
|
||||
private RMNodeStatusEvent getMockRMNodeStatusEvent(
|
||||
List<ContainerStatus> containerStatus) {
|
||||
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
|
||||
|
||||
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
|
||||
@ -157,6 +164,9 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent() {
|
||||
doReturn(healthStatus).when(event).getNodeHealthStatus();
|
||||
doReturn(response).when(event).getLatestResponse();
|
||||
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
|
||||
if (containerStatus != null) {
|
||||
doReturn(containerStatus).when(event).getContainers();
|
||||
}
|
||||
return event;
|
||||
}
|
||||
|
||||
@ -617,4 +627,47 @@ public void testReconnnectUpdate() {
|
||||
null, null));
|
||||
Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisappearingContainer() {
|
||||
ContainerId cid1 = BuilderUtils.newContainerId(
|
||||
BuilderUtils.newApplicationAttemptId(
|
||||
BuilderUtils.newApplicationId(1, 1), 1), 1);
|
||||
ContainerId cid2 = BuilderUtils.newContainerId(
|
||||
BuilderUtils.newApplicationAttemptId(
|
||||
BuilderUtils.newApplicationId(2, 2), 2), 2);
|
||||
ArrayList<ContainerStatus> containerStats =
|
||||
new ArrayList<ContainerStatus>();
|
||||
containerStats.add(ContainerStatus.newInstance(cid1,
|
||||
ContainerState.RUNNING, "", -1));
|
||||
containerStats.add(ContainerStatus.newInstance(cid2,
|
||||
ContainerState.RUNNING, "", -1));
|
||||
node = getRunningNode();
|
||||
node.handle(getMockRMNodeStatusEvent(containerStats));
|
||||
assertEquals("unexpected number of running containers",
|
||||
2, node.getLaunchedContainers().size());
|
||||
Assert.assertTrue("first container not running",
|
||||
node.getLaunchedContainers().contains(cid1));
|
||||
Assert.assertTrue("second container not running",
|
||||
node.getLaunchedContainers().contains(cid2));
|
||||
assertEquals("already completed containers",
|
||||
0, completedContainers.size());
|
||||
containerStats.remove(0);
|
||||
node.handle(getMockRMNodeStatusEvent(containerStats));
|
||||
assertEquals("expected one container to be completed",
|
||||
1, completedContainers.size());
|
||||
ContainerStatus cs = completedContainers.get(0);
|
||||
assertEquals("first container not the one that completed",
|
||||
cid1, cs.getContainerId());
|
||||
assertEquals("completed container not marked complete",
|
||||
ContainerState.COMPLETE, cs.getState());
|
||||
assertEquals("completed container not marked aborted",
|
||||
ContainerExitStatus.ABORTED, cs.getExitStatus());
|
||||
Assert.assertTrue("completed container not marked missing",
|
||||
cs.getDiagnostics().contains("not reported"));
|
||||
assertEquals("unexpected number of running containers",
|
||||
1, node.getLaunchedContainers().size());
|
||||
Assert.assertTrue("second container not running",
|
||||
node.getLaunchedContainers().contains(cid2));
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user