YARN-903. Changed ContainerManager to suppress unnecessary warnings when stopping already stopped containers. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1509560 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-08-02 06:54:22 +00:00
parent ee350ee2ae
commit 5b7889f9a7
7 changed files with 214 additions and 23 deletions

View File

@ -54,6 +54,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-573. Shared data structures in Public Localizer and Private Localizer
are not Thread safe. (Omkar Vinit Joshi via jlowe)
YARN-903. Changed ContainerManager to suppress unnecessary warnings when
stopping already stopped containers. (Omkar Vinit Joshi via vinodkv)
Release 2.1.0-beta - 2013-08-06
INCOMPATIBLE CHANGES

View File

@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.SecurityUtil;
@ -456,4 +457,10 @@ public class NodeManager extends CompositeService
Configuration conf = new YarnConfiguration();
nodeManager.initAndStartNodeManager(conf, false);
}
@VisibleForTesting
@Private
public NodeStatusUpdater getNodeStatusUpdater() {
return nodeStatusUpdater;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public interface NodeStatusUpdater extends Service {
@ -28,4 +29,8 @@ public interface NodeStatusUpdater extends Service {
NodeStatus getNodeStatusAndUpdateContainersInContext();
long getRMIdentifier();
public boolean isContainerRecentlyStopped(ContainerId containerId);
public void clearFinishedContainersFromCache();
}

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -68,6 +69,9 @@ import com.google.common.annotations.VisibleForTesting;
public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater {
public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS =
YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers";
private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
private final Object heartbeatMonitor = new Object();
@ -88,6 +92,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private Map<ApplicationId, Long> appTokenKeepAliveMap =
new HashMap<ApplicationId, Long>();
private Random keepAliveDelayRandom = new Random();
// It will be used to track recently stopped containers on node manager.
private final Map<ContainerId, Long> recentlyStoppedContainers;
// Duration for which to track recently stopped container.
private long durationToTrackStoppedContainers;
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
@ -103,6 +111,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
this.recentlyStoppedContainers =
new LinkedHashMap<ContainerId, Long>();
}
@Override
@ -129,11 +139,27 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
durationToTrackStoppedContainers =
conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
600000);
if (durationToTrackStoppedContainers < 0) {
String message = "Invalid configuration for "
+ YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default "
+ "value is 10Min(600000).";
LOG.error(message);
throw new YarnException(message);
}
if (LOG.isDebugEnabled()) {
LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :"
+ durationToTrackStoppedContainers);
}
super.serviceInit(conf);
LOG.info("Initialized nodemanager for " + nodeId + ":" +
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" virtual-cores=" + virtualCores);
super.serviceInit(conf);
}
@Override
@ -290,7 +316,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (containerStatus.getState() == ContainerState.COMPLETE) {
// Remove
i.remove();
// Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache.
addStoppedContainersToCache(containerId);
LOG.info("Removed completed container " + containerId);
}
}
@ -340,6 +370,46 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
public boolean isContainerRecentlyStopped(ContainerId containerId) {
synchronized (recentlyStoppedContainers) {
return recentlyStoppedContainers.containsKey(containerId);
}
}
@Private
@VisibleForTesting
public void addStoppedContainersToCache(ContainerId containerId) {
synchronized (recentlyStoppedContainers) {
removeVeryOldStoppedContainersFromCache();
recentlyStoppedContainers.put(containerId,
System.currentTimeMillis() + durationToTrackStoppedContainers);
}
}
@Override
public void clearFinishedContainersFromCache() {
synchronized (recentlyStoppedContainers) {
recentlyStoppedContainers.clear();
}
}
@Private
@VisibleForTesting
public void removeVeryOldStoppedContainersFromCache() {
synchronized (recentlyStoppedContainers) {
long currentTime = System.currentTimeMillis();
Iterator<ContainerId> i =
recentlyStoppedContainers.keySet().iterator();
while (i.hasNext()) {
if (recentlyStoppedContainers.get(i.next()) < currentTime) {
i.remove();
} else {
break;
}
}
}
}
@Override
public long getRMIdentifier() {
return this.rmIdentifier;
@ -455,4 +525,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start();
}
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@ -581,17 +582,24 @@ public class ContainerManagerImpl extends CompositeService implements
authorizeGetAndStopContainerRequest(containerID, container, true,
nmTokenIdentifier);
dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
"Container killed by the ApplicationMaster."));
if (container == null) {
if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
throw RPCUtil.getRemoteException("Container " + containerIDStr
+ " is not handled by this NodeManager");
}
} else {
dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
"Container killed by the ApplicationMaster."));
NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
.getApplicationAttemptId().getApplicationId(), containerID);
NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
.getApplicationAttemptId().getApplicationId(), containerID);
// TODO: Move this code to appropriate place once kill_container is
// implemented.
nodeStatusUpdater.sendOutofBandHeartBeat();
// TODO: Move this code to appropriate place once kill_container is
// implemented.
nodeStatusUpdater.sendOutofBandHeartBeat();
}
}
/**
@ -627,6 +635,15 @@ public class ContainerManagerImpl extends CompositeService implements
authorizeGetAndStopContainerRequest(containerID, container, false,
nmTokenIdentifier);
if (container == null) {
if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
throw RPCUtil.getRemoteException("Container " + containerIDStr
+ " was recently stopped on node manager.");
} else {
throw RPCUtil.getRemoteException("Container " + containerIDStr
+ " is not handled by this NodeManager");
}
}
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
LOG.info("Returning " + containerStatus);
return containerStatus;
@ -658,17 +675,11 @@ public class ContainerManagerImpl extends CompositeService implements
container.getContainerId());
} else {
LOG.warn(identifier.getApplicationAttemptId()
+ " attempted to get get status for non-application container : "
+ " attempted to get status for non-application container : "
+ container.getContainerId().toString());
}
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " is not started by this application attempt.");
}
if (container == null) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " is not handled by this NodeManager");
}
}
class ContainerEventDispatcher implements EventHandler<ContainerEvent> {

View File

@ -426,7 +426,7 @@ public class TestNodeStatusUpdater {
return this.nodeStatusUpdater;
}
protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
public MyNodeStatusUpdater3 getNodeStatusUpdater() {
return this.nodeStatusUpdater;
}
}
@ -745,6 +745,40 @@ public class TestNodeStatusUpdater {
lfs.delete(new Path(basedir.getPath()), true);
}
@Test(timeout = 90000)
public void testRecentlyFinishedContainers() throws Exception {
NodeManager nm = new NodeManager();
YarnConfiguration conf = new YarnConfiguration();
conf.set(
NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
"10000");
nm.init(conf);
NodeStatusUpdaterImpl nodeStatusUpdater =
(NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
nodeStatusUpdater.addStoppedContainersToCache(cId);
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
long time1 = System.currentTimeMillis();
int waitInterval = 15;
while (waitInterval-- > 0
&& nodeStatusUpdater.isContainerRecentlyStopped(cId)) {
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
Thread.sleep(1000);
}
long time2 = System.currentTimeMillis();
// By this time the container will be removed from cache. need to verify.
Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000);
}
@Test
public void testNMRegistration() throws InterruptedException {
nm = new NodeManager() {

View File

@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -260,6 +262,11 @@ public class TestContainerManagerSecurity {
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
validContainerToken, validNMToken, true).contains(sb.toString()));
// Container is removed from node manager's memory by this time.
// trying to stop the container. It should not throw any exception.
testStopContainer(rpc, validAppAttemptId, validNode, validContainerId,
validNMToken, false);
// Rolling over master key twice so that we can check whether older keys
// are used for authentication.
rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
@ -267,13 +274,25 @@ public class TestContainerManagerSecurity {
rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
// trying get container status. Now saved nmToken should be used for
// authentication.
// authentication... It should complain saying container was recently
// stopped.
sb = new StringBuilder("Container ");
sb.append(validContainerId);
sb.append(" was recently stopped on node manager");
Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
validContainerId, validNMToken, true).contains(sb.toString()));
// Now lets remove the container from nm-memory
nm.getNodeStatusUpdater().clearFinishedContainersFromCache();
// This should fail as container is removed from recently tracked finished
// containers.
sb = new StringBuilder("Container ");
sb.append(validContainerId.toString());
sb.append(" is not handled by this NodeManager");
Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
validContainerId, validNMToken, false).contains(sb.toString()));
}
private void waitForContainerToFinishOnNM(ContainerId containerId) {
@ -315,6 +334,23 @@ public class TestContainerManagerSecurity {
Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId()
== nmTokenSecretManagerRM.getCurrentKey().getKeyId()));
}
private String testStopContainer(YarnRPC rpc,
ApplicationAttemptId appAttemptId, NodeId nodeId,
ContainerId containerId, Token nmToken, boolean isExceptionExpected) {
try {
stopContainer(rpc, nmToken,
Arrays.asList(new ContainerId[] { containerId }), appAttemptId,
nodeId);
if (isExceptionExpected) {
fail("Exception was expected!!");
}
return "";
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
}
}
private String testGetContainer(YarnRPC rpc,
ApplicationAttemptId appAttemptId, NodeId nodeId,
@ -334,7 +370,7 @@ public class TestContainerManagerSecurity {
}
}
protected String testStartContainer(YarnRPC rpc,
private String testStartContainer(YarnRPC rpc,
ApplicationAttemptId appAttemptId, NodeId nodeId,
org.apache.hadoop.yarn.api.records.Token containerToken,
org.apache.hadoop.yarn.api.records.Token nmToken,
@ -352,6 +388,29 @@ public class TestContainerManagerSecurity {
}
}
private void stopContainer(YarnRPC rpc, Token nmToken,
List<ContainerId> containerId, ApplicationAttemptId appAttemptId,
NodeId nodeId) throws Exception {
StopContainersRequest request =
StopContainersRequest.newInstance(containerId);
ContainerManagementProtocol proxy = null;
try {
proxy =
getContainerManagementProtocolProxy(rpc, nmToken, nodeId,
appAttemptId.toString());
StopContainersResponse response = proxy.stopContainers(request);
if (response.getFailedRequests() != null &&
response.getFailedRequests().containsKey(containerId)) {
parseAndThrowException(response.getFailedRequests().get(containerId)
.deSerialize());
}
} catch (Exception e) {
if (proxy != null) {
rpc.stopProxy(proxy, conf);
}
}
}
private void
getContainerStatus(YarnRPC rpc,
org.apache.hadoop.yarn.api.records.Token nmToken,