YARN-2617. Fixed NM to not send duplicate container status whose app is not running. Contributed by Jun Gong
This commit is contained in:
parent
c2fa5d1c42
commit
3ef1cf187f
|
@ -515,6 +515,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2630. Prevented previous AM container status from being acquired by the
|
YARN-2630. Prevented previous AM container status from being acquired by the
|
||||||
current restarted AM. (Jian He via zjshen)
|
current restarted AM. (Jian He via zjshen)
|
||||||
|
|
||||||
|
YARN-2617. Fixed NM to not send duplicate container status whose app is not
|
||||||
|
running. (Jun Gong via jianhe)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
@ -354,20 +355,24 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
ContainerId containerId = container.getContainerId();
|
ContainerId containerId = container.getContainerId();
|
||||||
ApplicationId applicationId = container.getContainerId()
|
ApplicationId applicationId = container.getContainerId()
|
||||||
.getApplicationAttemptId().getApplicationId();
|
.getApplicationAttemptId().getApplicationId();
|
||||||
if (!this.context.getApplications().containsKey(applicationId)) {
|
|
||||||
context.getContainers().remove(containerId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
|
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
|
||||||
container.cloneAndGetContainerStatus();
|
container.cloneAndGetContainerStatus();
|
||||||
containerStatuses.add(containerStatus);
|
containerStatuses.add(containerStatus);
|
||||||
if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
|
if (containerStatus.getState() == ContainerState.COMPLETE) {
|
||||||
|
if (isApplicationStopped(applicationId)) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(applicationId + " is completing, " + " remove "
|
||||||
|
+ containerId + " from NM context.");
|
||||||
|
}
|
||||||
|
context.getContainers().remove(containerId);
|
||||||
|
} else {
|
||||||
// Adding to finished containers cache. Cache will keep it around at
|
// Adding to finished containers cache. Cache will keep it around at
|
||||||
// least for #durationToTrackStoppedContainers duration. In the
|
// least for #durationToTrackStoppedContainers duration. In the
|
||||||
// subsequent call to stop container it will get removed from cache.
|
// subsequent call to stop container it will get removed from cache.
|
||||||
addCompletedContainer(container.getContainerId());
|
addCompletedContainer(container.getContainerId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Sending out " + containerStatuses.size()
|
LOG.debug("Sending out " + containerStatuses.size()
|
||||||
+ " container statuses: " + containerStatuses);
|
+ " container statuses: " + containerStatuses);
|
||||||
|
@ -396,7 +401,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
NMContainerStatus status =
|
NMContainerStatus status =
|
||||||
container.getNMContainerStatus();
|
container.getNMContainerStatus();
|
||||||
containerStatuses.add(status);
|
containerStatuses.add(status);
|
||||||
if (status.getContainerState().equals(ContainerState.COMPLETE)) {
|
if (status.getContainerState() == ContainerState.COMPLETE) {
|
||||||
// Adding to finished containers cache. Cache will keep it around at
|
// Adding to finished containers cache. Cache will keep it around at
|
||||||
// least for #durationToTrackStoppedContainers duration. In the
|
// least for #durationToTrackStoppedContainers duration. In the
|
||||||
// subsequent call to stop container it will get removed from cache.
|
// subsequent call to stop container it will get removed from cache.
|
||||||
|
@ -408,6 +413,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
return containerStatuses;
|
return containerStatuses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isApplicationStopped(ApplicationId applicationId) {
|
||||||
|
if (!this.context.getApplications().containsKey(applicationId)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
ApplicationState applicationState = this.context.getApplications().get(
|
||||||
|
applicationId).getApplicationState();
|
||||||
|
if (applicationState == ApplicationState.FINISHING_CONTAINERS_WAIT
|
||||||
|
|| applicationState == ApplicationState.APPLICATION_RESOURCES_CLEANINGUP
|
||||||
|
|| applicationState == ApplicationState.FINISHED) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addCompletedContainer(ContainerId containerId) {
|
public void addCompletedContainer(ContainerId containerId) {
|
||||||
synchronized (recentlyStoppedContainers) {
|
synchronized (recentlyStoppedContainers) {
|
||||||
|
|
|
@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
@ -891,15 +892,23 @@ public class TestNodeStatusUpdater {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
nm.getNMContext().getApplications().putIfAbsent(appId,
|
Application application = mock(Application.class);
|
||||||
mock(Application.class));
|
when(application.getApplicationState()).thenReturn(ApplicationState.RUNNING);
|
||||||
|
nm.getNMContext().getApplications().putIfAbsent(appId, application);
|
||||||
nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
|
nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
|
||||||
|
|
||||||
Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
|
Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
|
||||||
|
|
||||||
|
when(application.getApplicationState()).thenReturn(
|
||||||
|
ApplicationState.FINISHING_CONTAINERS_WAIT);
|
||||||
|
// The completed container will be sent one time. Then we will delete it.
|
||||||
|
Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
|
||||||
|
Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
|
||||||
|
|
||||||
|
nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
|
||||||
nm.getNMContext().getApplications().remove(appId);
|
nm.getNMContext().getApplications().remove(appId);
|
||||||
nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList
|
// The completed container will be sent one time. Then we will delete it.
|
||||||
<ContainerId>());
|
Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
|
||||||
Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
|
Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue