YARN-1783. Fixed a bug in NodeManager's status-updater that was losing completed container statuses when NodeManager is forced to resync by the ResourceManager. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1575437 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-07 22:36:47 +00:00
parent b06cc16f7d
commit 1c4047b0e4
7 changed files with 324 additions and 172 deletions

View File

@ -411,11 +411,15 @@ Release 2.4.0 - UNRELEASED
configuration-provider when booting up. (Xuan Gong via vinodkv) configuration-provider when booting up. (Xuan Gong via vinodkv)
YARN-1768. Fixed error message being too verbose when killing a non-existent YARN-1768. Fixed error message being too verbose when killing a non-existent
application application. (Tsuyoshi OZAWA via raviprak)
YARN-1774. FS: Submitting to non-leaf queue throws NPE. (Anubhav Dhoot and YARN-1774. FS: Submitting to non-leaf queue throws NPE. (Anubhav Dhoot and
Karthik Kambatla via kasha) Karthik Kambatla via kasha)
YARN-1783. Fixed a bug in NodeManager's status-updater that was losing
completed container statuses when NodeManager is forced to resync by the
ResourceManager. (Jian He via vinodkv)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -229,7 +229,8 @@ public class NodeManager extends CompositeService
containerManager.setBlockNewContainerRequests(true); containerManager.setBlockNewContainerRequests(true);
LOG.info("Cleaning up running containers on resync"); LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync(); containerManager.cleanupContainersOnNMResync();
((NodeStatusUpdaterImpl) nodeStatusUpdater).rebootNodeStatusUpdater(); ((NodeStatusUpdaterImpl) nodeStatusUpdater)
.rebootNodeStatusUpdaterAndRegisterWithRM();
} catch (YarnRuntimeException e) { } catch (YarnRuntimeException e) {
LOG.fatal("Error while rebooting NodeStatusUpdater.", e); LOG.fatal("Error while rebooting NodeStatusUpdater.", e);
shutDown(); shutDown();
@ -243,7 +244,7 @@ public class NodeManager extends CompositeService
private NodeId nodeId = null; private NodeId nodeId = null;
private final ConcurrentMap<ApplicationId, Application> applications = private final ConcurrentMap<ApplicationId, Application> applications =
new ConcurrentHashMap<ApplicationId, Application>(); new ConcurrentHashMap<ApplicationId, Application>();
private final ConcurrentMap<ContainerId, Container> containers = protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>(); new ConcurrentSkipListMap<ContainerId, Container>();
private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMContainerTokenSecretManager containerTokenSecretManager;

View File

@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public interface NodeStatusUpdater extends Service { public interface NodeStatusUpdater extends Service {
void sendOutofBandHeartBeat(); void sendOutofBandHeartBeat();
NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId);
long getRMIdentifier(); long getRMIdentifier();
public boolean isContainerRecentlyStopped(ContainerId containerId); public boolean isContainerRecentlyStopped(ContainerId containerId);

View File

@ -23,12 +23,14 @@ import java.net.ConnectException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -93,11 +95,19 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private Map<ApplicationId, Long> appTokenKeepAliveMap = private Map<ApplicationId, Long> appTokenKeepAliveMap =
new HashMap<ApplicationId, Long>(); new HashMap<ApplicationId, Long>();
private Random keepAliveDelayRandom = new Random(); private Random keepAliveDelayRandom = new Random();
// It will be used to track recently stopped containers on node manager. // It will be used to track recently stopped containers on node manager, this
// is to avoid the misleading no-such-container exception messages on NM, when
// the AM finishes it informs the RM to stop the may-be-already-completed
// containers.
private final Map<ContainerId, Long> recentlyStoppedContainers; private final Map<ContainerId, Long> recentlyStoppedContainers;
// Duration for which to track recently stopped container. // Duration for which to track recently stopped container.
private long durationToTrackStoppedContainers; private long durationToTrackStoppedContainers;
// This is used to track the current completed containers when nodeheartBeat
// is called. These completed containers will be removed from NM context after
// nodeHeartBeat succeeds and the response from the nodeHeartBeat is
// processed.
private final Set<ContainerId> previousCompletedContainers;
private final NodeHealthCheckerService healthChecker; private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
@ -114,6 +124,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.metrics = metrics; this.metrics = metrics;
this.recentlyStoppedContainers = this.recentlyStoppedContainers =
new LinkedHashMap<ContainerId, Long>(); new LinkedHashMap<ContainerId, Long>();
this.previousCompletedContainers = new HashSet<ContainerId>();
} }
@Override @Override
@ -194,7 +205,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
super.serviceStop(); super.serviceStop();
} }
protected void rebootNodeStatusUpdater() { protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
// Interrupt the updater. // Interrupt the updater.
this.isStopped = true; this.isStopped = true;
@ -235,8 +246,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@VisibleForTesting @VisibleForTesting
protected void registerWithRM() protected void registerWithRM()
throws YarnException, IOException { throws YarnException, IOException {
List<ContainerStatus> containerStatuses = List<ContainerStatus> containerStatuses = getContainerStatuses();
this.updateAndGetContainerStatuses();
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerStatuses); nodeManagerVersionId, containerStatuses);
@ -321,62 +331,72 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return appList; return appList;
} }
@Override private NodeStatus getNodeStatus(int responseId) {
public NodeStatus getNodeStatusAndUpdateContainersInContext(
int responseId) {
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime( nodeHealthStatus.setLastHealthReportTime(healthChecker
healthChecker.getLastHealthReportTime()); .getLastHealthReportTime());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport()); + ", " + nodeHealthStatus.getHealthReport());
} }
List<ContainerStatus> containersStatuses = updateAndGetContainerStatuses(); List<ContainerStatus> containersStatuses = getContainerStatuses();
if (LOG.isDebugEnabled()) {
LOG.debug(this.nodeId + " sending out status for " LOG.debug(this.nodeId + " sending out status for "
+ containersStatuses.size() + " containers"); + containersStatuses.size() + " containers");
NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, }
containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus); NodeStatus nodeStatus =
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
createKeepAliveApplicationList(), nodeHealthStatus);
return nodeStatus; return nodeStatus;
} }
/* // Iterate through the NMContext and clone and get all the containers'
* It will return current container statuses. If any container has // statuses. If it's a completed container, add into the
* COMPLETED then it will be removed from context. // recentlyStoppedContainers and previousCompletedContainers collections.
*/ @VisibleForTesting
private List<ContainerStatus> updateAndGetContainerStatuses() { protected List<ContainerStatus> getContainerStatuses() {
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>(); List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i = for (Container container : this.context.getContainers().values()) {
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
ContainerId containerId = e.getKey();
Container container = e.getValue();
// Clone the container to send it to the RM
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus(); container.cloneAndGetContainerStatus();
containerStatuses.add(containerStatus); containerStatuses.add(containerStatus);
if (LOG.isDebugEnabled()) { if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
LOG.debug("Sending out status for container: " + containerStatus);
}
if (containerStatus.getState() == ContainerState.COMPLETE) {
// Remove
i.remove();
// Adding to finished containers cache. Cache will keep it around at // Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the // least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache. // subsequent call to stop container it will get removed from cache.
addStoppedContainersToCache(containerId); updateStoppedContainersInCache(container.getContainerId());
addCompletedContainer(container);
LOG.info("Removed completed container " + containerId);
} }
} }
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out container statuses: " + containerStatuses);
}
return containerStatuses; return containerStatuses;
} }
private void addCompletedContainer(Container container) {
synchronized (previousCompletedContainers) {
previousCompletedContainers.add(container.getContainerId());
}
}
private void removeCompletedContainersFromContext() {
synchronized (previousCompletedContainers) {
if (!previousCompletedContainers.isEmpty()) {
for (ContainerId containerId : previousCompletedContainers) {
this.context.getContainers().remove(containerId);
}
LOG.info("Removed completed containers from NM context: "
+ previousCompletedContainers);
previousCompletedContainers.clear();
}
}
}
private void trackAppsForKeepAlive(List<ApplicationId> appIds) { private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) { if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
for (ApplicationId appId : appIds) { for (ApplicationId appId : appIds) {
@ -409,7 +429,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Private @Private
@VisibleForTesting @VisibleForTesting
public void addStoppedContainersToCache(ContainerId containerId) { public void updateStoppedContainersInCache(ContainerId containerId) {
synchronized (recentlyStoppedContainers) { synchronized (recentlyStoppedContainers) {
removeVeryOldStoppedContainersFromCache(); removeVeryOldStoppedContainersFromCache();
recentlyStoppedContainers.put(containerId, recentlyStoppedContainers.put(containerId,
@ -457,8 +477,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// Send heartbeat // Send heartbeat
try { try {
NodeHeartbeatResponse response = null; NodeHeartbeatResponse response = null;
NodeStatus nodeStatus = NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
getNodeStatusAndUpdateContainersInContext(lastHeartBeatID);
NodeHeartbeatRequest request = NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus, NodeHeartbeatRequest.newInstance(nodeStatus,
@ -494,6 +513,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
break; break;
} }
// Explicitly put this method after checking the resync response. We
// don't want to remove the completed containers before resync
// because these completed containers will be reported back to RM
// when NM re-registers with RM.
removeCompletedContainersFromContext();
lastHeartBeatID = response.getResponseId(); lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response List<ContainerId> containersToCleanup = response
.getContainersToCleanup(); .getContainersToCleanup();

View File

@ -54,7 +54,11 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
public MockNodeStatusUpdater(Context context, Dispatcher dispatcher, public MockNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics); super(context, dispatcher, healthChecker, metrics);
resourceTracker = new MockResourceTracker(); resourceTracker = createResourceTracker();
}
protected ResourceTracker createResourceTracker() {
return new MockResourceTracker();
} }
@Override @Override
@ -66,7 +70,7 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
return; return;
} }
private static class MockResourceTracker implements ResourceTracker { protected static class MockResourceTracker implements ResourceTracker {
private int heartBeatID; private int heartBeatID;
@Override @Override

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@ -43,9 +45,17 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -162,6 +172,118 @@ public class TestNodeManagerResync {
} }
Assert.assertTrue("NM shutdown not called.",isNMShutdownCalled.get()); Assert.assertTrue("NM shutdown not called.",isNMShutdownCalled.get());
nm.stop();
}
// This is to test when NM gets the resync response from last heart beat, it
// should be able to send the already-sent-via-last-heart-beat container
// statuses again when it re-register with RM.
@Test
public void testNMSentContainerStatusOnResync() throws Exception {
final ContainerStatus testCompleteContainer =
TestNodeStatusUpdater.createContainerStatus(2, ContainerState.COMPLETE);
final Container container =
TestNodeStatusUpdater.getMockContainer(testCompleteContainer);
NodeManager nm = new NodeManager() {
int registerCount = 0;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new TestNodeStatusUpdaterResync(context, dispatcher,
healthChecker, metrics) {
@Override
protected ResourceTracker createResourceTracker() {
return new MockResourceTracker() {
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
if (registerCount == 0) {
// first register, no containers info.
try {
Assert.assertEquals(0, request.getContainerStatuses()
.size());
} catch (AssertionError error) {
error.printStackTrace();
assertionFailedInThread.set(true);
}
// put the completed container into the context
getNMContext().getContainers().put(
testCompleteContainer.getContainerId(), container);
} else {
// second register contains the completed container info.
List<ContainerStatus> statuses =
request.getContainerStatuses();
try {
Assert.assertEquals(1, statuses.size());
Assert.assertEquals(testCompleteContainer.getContainerId(),
statuses.get(0).getContainerId());
} catch (AssertionError error) {
error.printStackTrace();
assertionFailedInThread.set(true);
}
}
registerCount++;
return super.registerNodeManager(request);
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) {
// first heartBeat contains the completed container info
List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses();
try {
Assert.assertEquals(1, statuses.size());
Assert.assertEquals(testCompleteContainer.getContainerId(),
statuses.get(0).getContainerId());
} catch (AssertionError error) {
error.printStackTrace();
assertionFailedInThread.set(true);
}
// notify RESYNC on first heartbeat.
return YarnServerBuilderUtils.newNodeHeartbeatResponse(1,
NodeAction.RESYNC, null, null, null, null, 1000L);
}
};
}
};
}
};
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
}
Assert.assertFalse(assertionFailedInThread.get());
nm.stop();
}
// This can be used as a common base class for testing NM resync behavior.
class TestNodeStatusUpdaterResync extends MockNodeStatusUpdater {
public TestNodeStatusUpdaterResync(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
}
@Override
protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
try {
// Wait here so as to sync with the main test thread.
super.rebootNodeStatusUpdaterAndRegisterWithRM();
syncBarrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
} catch (AssertionError ae) {
ae.printStackTrace();
assertionFailedInThread.set(true);
}
}
} }
private YarnConfiguration createNMConfig() { private YarnConfiguration createNMConfig() {
@ -206,14 +328,14 @@ public class TestNodeManagerResync {
} }
@Override @Override
protected void rebootNodeStatusUpdater() { protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers = .containermanager.container.Container> containers =
getNMContext().getContainers(); getNMContext().getContainers();
try { try {
// ensure that containers are empty before restart nodeStatusUpdater // ensure that containers are empty before restart nodeStatusUpdater
Assert.assertTrue(containers.isEmpty()); Assert.assertTrue(containers.isEmpty());
super.rebootNodeStatusUpdater(); super.rebootNodeStatusUpdaterAndRegisterWithRM();
syncBarrier.await(); syncBarrier.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
} catch (BrokenBarrierException e) { } catch (BrokenBarrierException e) {
@ -278,7 +400,7 @@ public class TestNodeManagerResync {
} }
@Override @Override
protected void rebootNodeStatusUpdater() { protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers = .containermanager.container.Container> containers =
getNMContext().getContainers(); getNMContext().getContainers();
@ -286,7 +408,7 @@ public class TestNodeManagerResync {
try { try {
// ensure that containers are empty before restart nodeStatusUpdater // ensure that containers are empty before restart nodeStatusUpdater
Assert.assertTrue(containers.isEmpty()); Assert.assertTrue(containers.isEmpty());
super.rebootNodeStatusUpdater(); super.rebootNodeStatusUpdaterAndRegisterWithRM();
// After this point new containers are free to be launched, except // After this point new containers are free to be launched, except
// containers from previous RM // containers from previous RM
// Wait here so as to sync with the main test thread. // Wait here so as to sync with the main test thread.

View File

@ -34,7 +34,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -117,8 +116,6 @@ public class TestNodeStatusUpdater {
private boolean triggered = false; private boolean triggered = false;
private Configuration conf; private Configuration conf;
private NodeManager nm; private NodeManager nm;
private boolean containerStatusBackupSuccessfully = true;
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
@Before @Before
@ -304,6 +301,8 @@ public class TestNodeStatusUpdater {
} }
} }
// Test NodeStatusUpdater sends the right container statuses each time it
// heart beats.
private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl { private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker; public ResourceTracker resourceTracker;
@ -555,6 +554,8 @@ public class TestNodeStatusUpdater {
} }
} }
// Test NodeStatusUpdater sends the right container statuses each time it
// heart beats.
private class MyResourceTracker4 implements ResourceTracker { private class MyResourceTracker4 implements ResourceTracker {
public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL;
@ -567,10 +568,9 @@ public class TestNodeStatusUpdater {
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, RegisterNodeManagerRequest request) throws YarnException, IOException {
IOException { RegisterNodeManagerResponse response =
RegisterNodeManagerResponse response = recordFactory recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction); response.setNodeAction(registerNodeAction);
response.setContainerTokenMasterKey(createMasterKey()); response.setContainerTokenMasterKey(createMasterKey());
response.setNMTokenMasterKey(createMasterKey()); response.setNMTokenMasterKey(createMasterKey());
@ -586,64 +586,85 @@ public class TestNodeStatusUpdater {
.size(), 0); .size(), 0);
Assert.assertEquals(context.getContainers().size(), 0); Assert.assertEquals(context.getContainers().size(), 0);
} else if (heartBeatID == 1) { } else if (heartBeatID == 1) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses() List<ContainerStatus> statuses =
.size(), 5); request.getNodeStatus().getContainersStatuses();
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() Assert.assertEquals(statuses.size(), 2);
.get(0).getState() == ContainerState.RUNNING Assert.assertEquals(context.getContainers().size(), 2);
&& request.getNodeStatus().getContainersStatuses().get(0)
.getContainerId().getId() == 1); ContainerStatus containerStatus2 =
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() createContainerStatus(2, ContainerState.RUNNING);
.get(1).getState() == ContainerState.RUNNING ContainerStatus containerStatus3 =
&& request.getNodeStatus().getContainersStatuses().get(1) createContainerStatus(3, ContainerState.COMPLETE);
.getContainerId().getId() == 2); boolean container2Exist = false, container3Exist = false;
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() for (ContainerStatus status : statuses) {
.get(2).getState() == ContainerState.COMPLETE if (status.getContainerId().equals(
&& request.getNodeStatus().getContainersStatuses().get(2) containerStatus2.getContainerId())) {
.getContainerId().getId() == 3); Assert.assertTrue(status.getState().equals(
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() containerStatus2.getState()));
.get(3).getState() == ContainerState.COMPLETE container2Exist = true;
&& request.getNodeStatus().getContainersStatuses().get(3) }
.getContainerId().getId() == 4); if (status.getContainerId().equals(
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() containerStatus3.getContainerId())) {
.get(4).getState() == ContainerState.RUNNING Assert.assertTrue(status.getState().equals(
&& request.getNodeStatus().getContainersStatuses().get(4) containerStatus3.getState()));
.getContainerId().getId() == 5); container3Exist = true;
throw new java.net.ConnectException("Lost the heartbeat response"); }
}
Assert.assertTrue(container2Exist && container3Exist);
// should throw exception that can be retried by the
// nodeStatusUpdaterRunnable, otherwise nm just shuts down and the
// test passes.
throw new YarnRuntimeException("Lost the heartbeat response");
} else if (heartBeatID == 2) { } else if (heartBeatID == 2) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses() List<ContainerStatus> statuses =
.size(), 7); request.getNodeStatus().getContainersStatuses();
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() Assert.assertEquals(statuses.size(), 4);
.get(0).getState() == ContainerState.COMPLETE Assert.assertEquals(context.getContainers().size(), 4);
&& request.getNodeStatus().getContainersStatuses().get(0)
.getContainerId().getId() == 3); ContainerStatus containerStatus2 =
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() createContainerStatus(2, ContainerState.RUNNING);
.get(1).getState() == ContainerState.COMPLETE ContainerStatus containerStatus3 =
&& request.getNodeStatus().getContainersStatuses().get(1) createContainerStatus(3, ContainerState.COMPLETE);
.getContainerId().getId() == 4); ContainerStatus containerStatus4 =
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() createContainerStatus(4, ContainerState.RUNNING);
.get(2).getState() == ContainerState.RUNNING ContainerStatus containerStatus5 =
&& request.getNodeStatus().getContainersStatuses().get(2) createContainerStatus(5, ContainerState.COMPLETE);
.getContainerId().getId() == 1);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() boolean container2Exist = false, container3Exist = false, container4Exist =
.get(3).getState() == ContainerState.RUNNING false, container5Exist = false;
&& request.getNodeStatus().getContainersStatuses().get(3) for (ContainerStatus status : statuses) {
.getContainerId().getId() == 2); if (status.getContainerId().equals(
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() containerStatus2.getContainerId())) {
.get(4).getState() == ContainerState.RUNNING Assert.assertTrue(status.getState().equals(
&& request.getNodeStatus().getContainersStatuses().get(4) containerStatus2.getState()));
.getContainerId().getId() == 5); container2Exist = true;
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() }
.get(5).getState() == ContainerState.RUNNING if (status.getContainerId().equals(
&& request.getNodeStatus().getContainersStatuses().get(5) containerStatus3.getContainerId())) {
.getContainerId().getId() == 6); Assert.assertTrue(status.getState().equals(
Assert.assertTrue(request.getNodeStatus().getContainersStatuses() containerStatus3.getState()));
.get(6).getState() == ContainerState.COMPLETE container3Exist = true;
&& request.getNodeStatus().getContainersStatuses().get(6) }
.getContainerId().getId() == 7); if (status.getContainerId().equals(
containerStatus4.getContainerId())) {
Assert.assertTrue(status.getState().equals(
containerStatus4.getState()));
container4Exist = true;
}
if (status.getContainerId().equals(
containerStatus5.getContainerId())) {
Assert.assertTrue(status.getState().equals(
containerStatus5.getState()));
container5Exist = true;
}
}
Assert.assertTrue(container2Exist && container3Exist
&& container4Exist && container5Exist);
} }
} catch (AssertionError error) { } catch (AssertionError error) {
LOG.info(error); error.printStackTrace();
containerStatusBackupSuccessfully = false; assertionFailedInThread.set(true);
} finally { } finally {
heartBeatID++; heartBeatID++;
} }
@ -651,9 +672,7 @@ public class TestNodeStatusUpdater {
nodeStatus.setResponseId(heartBeatID); nodeStatus.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse = NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
heartBeatNodeAction, heartBeatNodeAction, null, null, null, null, 1000L);
null, null, null,
null, 1000L);
return nhResponse; return nhResponse;
} }
} }
@ -761,7 +780,7 @@ public class TestNodeStatusUpdater {
ContainerId cId = ContainerId.newInstance(appAttemptId, 0); ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
nodeStatusUpdater.addStoppedContainersToCache(cId); nodeStatusUpdater.updateStoppedContainersInCache(cId);
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
long time1 = System.currentTimeMillis(); long time1 = System.currentTimeMillis();
@ -1119,7 +1138,8 @@ public class TestNodeStatusUpdater {
} }
/** /**
* Test completed containerStatus get back up when heart beat lost * Test completed containerStatus get back up when heart beat lost, and will
* be sent via next heart beat.
*/ */
@Test(timeout = 200000) @Test(timeout = 200000)
public void testCompletedContainerStatusBackup() throws Exception { public void testCompletedContainerStatusBackup() throws Exception {
@ -1150,7 +1170,7 @@ public class TestNodeStatusUpdater {
while (heartBeatID <= 3 && waitCount++ != 20) { while (heartBeatID <= 3 && waitCount++ != 20) {
Thread.sleep(500); Thread.sleep(500);
} }
if(!containerStatusBackupSuccessfully) { if(assertionFailedInThread.get()) {
Assert.fail("ContainerStatus Backup failed"); Assert.fail("ContainerStatus Backup failed");
} }
nm.stop(); nm.stop();
@ -1239,9 +1259,8 @@ public class TestNodeStatusUpdater {
nm.stop(); nm.stop();
} }
// Add new containers info into NM context each time node heart beats.
private class MyNMContext extends NMContext { private class MyNMContext extends NMContext {
ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
public MyNMContext( public MyNMContext(
NMContainerTokenSecretManager containerTokenSecretManager, NMContainerTokenSecretManager containerTokenSecretManager,
@ -1254,11 +1273,6 @@ public class TestNodeStatusUpdater {
if (heartBeatID == 0) { if (heartBeatID == 0) {
return containers; return containers;
} else if (heartBeatID == 1) { } else if (heartBeatID == 1) {
ContainerStatus containerStatus1 =
createContainerStatus(1, ContainerState.RUNNING);
Container container1 = getMockContainer(containerStatus1);
containers.put(containerStatus1.getContainerId(), container1);
ContainerStatus containerStatus2 = ContainerStatus containerStatus2 =
createContainerStatus(2, ContainerState.RUNNING); createContainerStatus(2, ContainerState.RUNNING);
Container container2 = getMockContainer(containerStatus2); Container container2 = getMockContainer(containerStatus2);
@ -1268,48 +1282,31 @@ public class TestNodeStatusUpdater {
createContainerStatus(3, ContainerState.COMPLETE); createContainerStatus(3, ContainerState.COMPLETE);
Container container3 = getMockContainer(containerStatus3); Container container3 = getMockContainer(containerStatus3);
containers.put(containerStatus3.getContainerId(), container3); containers.put(containerStatus3.getContainerId(), container3);
completedContainerStatusList.add(containerStatus3);
ContainerStatus containerStatus4 =
createContainerStatus(4, ContainerState.COMPLETE);
Container container4 = getMockContainer(containerStatus4);
containers.put(containerStatus4.getContainerId(), container4);
completedContainerStatusList.add(containerStatus4);
ContainerStatus containerStatus5 =
createContainerStatus(5, ContainerState.RUNNING);
Container container5 = getMockContainer(containerStatus5);
containers.put(containerStatus5.getContainerId(), container5);
return containers; return containers;
} else if (heartBeatID == 2) { } else if (heartBeatID == 2) {
ContainerStatus containerStatus6 = ContainerStatus containerStatus4 =
createContainerStatus(6, ContainerState.RUNNING); createContainerStatus(4, ContainerState.RUNNING);
Container container6 = getMockContainer(containerStatus6); Container container4 = getMockContainer(containerStatus4);
containers.put(containerStatus6.getContainerId(), container6); containers.put(containerStatus4.getContainerId(), container4);
ContainerStatus containerStatus7 =
createContainerStatus(7, ContainerState.COMPLETE);
Container container7 = getMockContainer(containerStatus7);
containers.put(containerStatus7.getContainerId(), container7);
completedContainerStatusList.add(containerStatus7);
ContainerStatus containerStatus5 =
createContainerStatus(5, ContainerState.COMPLETE);
Container container5 = getMockContainer(containerStatus5);
containers.put(containerStatus5.getContainerId(), container5);
return containers; return containers;
} else { } else {
containers.clear(); containers.clear();
return containers; return containers;
} }
} }
}
private ContainerStatus createContainerStatus(int id, public static ContainerStatus createContainerStatus(int id,
ContainerState containerState) { ContainerState containerState) {
ApplicationId applicationId = ApplicationId applicationId = ApplicationId.newInstance(0, 1);
BuilderUtils.newApplicationId(System.currentTimeMillis(), id);
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId applicationAttemptId =
BuilderUtils.newApplicationAttemptId(applicationId, id); ApplicationAttemptId.newInstance(applicationId, 1);
ContainerId contaierId = ContainerId contaierId = ContainerId.newInstance(applicationAttemptId, id);
BuilderUtils.newContainerId(applicationAttemptId, id);
ContainerStatus containerStatus = ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(contaierId, containerState, BuilderUtils.newContainerStatus(contaierId, containerState,
"test_containerStatus: id=" + id + ", containerState: " "test_containerStatus: id=" + id + ", containerState: "
@ -1317,12 +1314,14 @@ public class TestNodeStatusUpdater {
return containerStatus; return containerStatus;
} }
private Container getMockContainer(ContainerStatus containerStatus) { public static Container getMockContainer(ContainerStatus containerStatus) {
Container container = mock(Container.class); ContainerImpl container = mock(ContainerImpl.class);
when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus); when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
when(container.getCurrentState()).thenReturn(containerStatus.getState());
when(container.getContainerId()).thenReturn(
containerStatus.getContainerId());
return container; return container;
} }
}
private void verifyNodeStartFailure(String errMessage) throws Exception { private void verifyNodeStartFailure(String errMessage) throws Exception {
Assert.assertNotNull("nm is null", nm); Assert.assertNotNull("nm is null", nm);