YARN-2825. Container leak on NM. Contributed by Jian He
(cherry picked from commit c3d475070a
)
This commit is contained in:
parent
3ef876712c
commit
a5764cb783
|
@ -879,6 +879,8 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2753. Fixed a bunch of bugs in the NodeLabelsManager classes. (Zhihai xu
|
||||
via vinodkv)
|
||||
|
||||
YARN-2825. Container leak on NM (Jian He via jlowe)
|
||||
|
||||
Release 2.5.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -66,8 +66,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
|
@ -115,6 +115,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
private Runnable statusUpdaterRunnable;
|
||||
private Thread statusUpdater;
|
||||
private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
||||
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
|
||||
|
||||
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||
|
@ -446,19 +447,27 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
|
||||
@VisibleForTesting
|
||||
@Private
|
||||
public void removeCompletedContainersFromContext(
|
||||
public void removeOrTrackCompletedContainersFromContext(
|
||||
List<ContainerId> containerIds) throws IOException {
|
||||
Set<ContainerId> removedContainers = new HashSet<ContainerId>();
|
||||
|
||||
// If the AM has pulled the completedContainer it can be removed
|
||||
for (ContainerId containerId : containerIds) {
|
||||
context.getContainers().remove(containerId);
|
||||
removedContainers.add(containerId);
|
||||
pendingContainersToRemove.addAll(containerIds);
|
||||
Iterator<ContainerId> iter = pendingContainersToRemove.iterator();
|
||||
while (iter.hasNext()) {
|
||||
ContainerId containerId = iter.next();
|
||||
// remove the container only if the container is at DONE state
|
||||
Container nmContainer = context.getContainers().get(containerId);
|
||||
if (nmContainer != null && nmContainer.getContainerState().equals(
|
||||
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) {
|
||||
context.getContainers().remove(containerId);
|
||||
removedContainers.add(containerId);
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
if (!removedContainers.isEmpty()) {
|
||||
LOG.info("Removed completed containers from NM context: " +
|
||||
removedContainers);
|
||||
LOG.info("Removed completed containers from NM context: "
|
||||
+ removedContainers);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -601,7 +610,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
// because these completed containers will be reported back to RM
|
||||
// when NM re-registers with RM.
|
||||
// Only remove the cleanedup containers that are acked
|
||||
removeCompletedContainersFromContext(response
|
||||
removeOrTrackCompletedContainersFromContext(response
|
||||
.getContainersToBeRemovedFromNM());
|
||||
|
||||
lastHeartBeatID = response.getResponseId();
|
||||
|
|
|
@ -30,9 +30,11 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -866,18 +868,57 @@ public class TestNodeStatusUpdater {
|
|||
public ContainerState getCurrentState() {
|
||||
return ContainerState.COMPLETE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() {
|
||||
return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE;
|
||||
}
|
||||
};
|
||||
|
||||
ContainerId runningContainerId =
|
||||
ContainerId.newInstance(appAttemptId, 3);
|
||||
Token runningContainerToken =
|
||||
BuilderUtils.newContainerToken(runningContainerId, "anyHost",
|
||||
1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
|
||||
"password".getBytes(), 0);
|
||||
Container runningContainer =
|
||||
new ContainerImpl(conf, null, null, null, null, null,
|
||||
BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) {
|
||||
@Override
|
||||
public ContainerState getCurrentState() {
|
||||
return ContainerState.RUNNING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() {
|
||||
return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING;
|
||||
}
|
||||
};
|
||||
|
||||
nm.getNMContext().getApplications().putIfAbsent(appId,
|
||||
mock(Application.class));
|
||||
nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
|
||||
Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
|
||||
nm.getNMContext().getContainers()
|
||||
.put(runningContainerId, runningContainer);
|
||||
|
||||
Assert.assertEquals(2, nodeStatusUpdater.getContainerStatuses().size());
|
||||
|
||||
List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
|
||||
ackedContainers.add(cId);
|
||||
ackedContainers.add(runningContainerId);
|
||||
|
||||
nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers);
|
||||
Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty());
|
||||
nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers);
|
||||
|
||||
Set<ContainerId> containerIdSet = new HashSet<ContainerId>();
|
||||
for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) {
|
||||
containerIdSet.add(status.getContainerId());
|
||||
}
|
||||
|
||||
Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1);
|
||||
// completed container is removed;
|
||||
Assert.assertFalse(containerIdSet.contains(anyCompletedContainer));
|
||||
// running container is not removed;
|
||||
Assert.assertTrue(containerIdSet.contains(runningContainerId));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1467,6 +1508,13 @@ public class TestNodeStatusUpdater {
|
|||
when(container.getCurrentState()).thenReturn(containerStatus.getState());
|
||||
when(container.getContainerId()).thenReturn(
|
||||
containerStatus.getContainerId());
|
||||
if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
|
||||
when(container.getContainerState())
|
||||
.thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE);
|
||||
} else if (containerStatus.getState().equals(ContainerState.RUNNING)) {
|
||||
when(container.getContainerState())
|
||||
.thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING);
|
||||
}
|
||||
return container;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue