YARN-72. NM should handle cleaning up containers when it shuts down. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1416484 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2012-12-03 12:12:11 +00:00
parent cbed126eec
commit 235749a8ab
7 changed files with 409 additions and 11 deletions

View File

@ -117,7 +117,11 @@ Release 2.0.3-alpha - Unreleased
YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy) YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy)
YARN-187. Add hierarchical queues to the fair scheduler. (Sandy Ryza via tomwhite) YARN-187. Add hierarchical queues to the fair scheduler.
(Sandy Ryza via tomwhite)
YARN-72. NM should handle cleaning up containers when it shuts down.
(Sandy Ryza via tomwhite)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07

View File

@ -25,13 +25,23 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
public class CMgrCompletedContainersEvent extends ContainerManagerEvent { public class CMgrCompletedContainersEvent extends ContainerManagerEvent {
private List<ContainerId> containerToCleanup; private List<ContainerId> containerToCleanup;
private Reason reason;
public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup) { public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup, Reason reason) {
super(ContainerManagerEventType.FINISH_CONTAINERS); super(ContainerManagerEventType.FINISH_CONTAINERS);
this.containerToCleanup = containersToCleanup; this.containerToCleanup = containersToCleanup;
this.reason = reason;
} }
public List<ContainerId> getContainersToCleanup() { public List<ContainerId> getContainersToCleanup() {
return this.containerToCleanup; return this.containerToCleanup;
} }
public Reason getReason() {
return reason;
}
public static enum Reason {
ON_SHUTDOWN, BY_RESOURCEMANAGER
}
} }

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -62,13 +65,23 @@ public class NodeManager extends CompositeService implements
*/ */
public static final int SHUTDOWN_HOOK_PRIORITY = 30; public static final int SHUTDOWN_HOOK_PRIORITY = 30;
/**
* Extra duration to wait for containers to be killed on shutdown.
*/
private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
private static final Log LOG = LogFactory.getLog(NodeManager.class); private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
private ApplicationACLsManager aclsManager; private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker; private NodeHealthCheckerService nodeHealthChecker;
private LocalDirsHandlerService dirsHandler; private LocalDirsHandlerService dirsHandler;
private Context context;
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
private static CompositeServiceShutdownHook nodeManagerShutdownHook; private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private long waitForContainersOnShutdownMillis;
public NodeManager() { public NodeManager() {
super(NodeManager.class.getName()); super(NodeManager.class.getName());
} }
@ -115,7 +128,7 @@ public class NodeManager extends CompositeService implements
containerTokenSecretManager = new NMContainerTokenSecretManager(conf); containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
} }
Context context = new NMContext(containerTokenSecretManager); this.context = new NMContext(containerTokenSecretManager);
this.aclsManager = new ApplicationACLsManager(conf); this.aclsManager = new ApplicationACLsManager(conf);
@ -131,7 +144,7 @@ public class NodeManager extends CompositeService implements
addService(del); addService(del);
// NodeManager level dispatcher // NodeManager level dispatcher
AsyncDispatcher dispatcher = new AsyncDispatcher(); this.dispatcher = new AsyncDispatcher();
nodeHealthChecker = new NodeHealthCheckerService(); nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker); addService(nodeHealthChecker);
@ -144,7 +157,7 @@ public class NodeManager extends CompositeService implements
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor); addService(nodeResourceMonitor);
ContainerManagerImpl containerManager = containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater, createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler); this.aclsManager, dirsHandler);
addService(containerManager); addService(containerManager);
@ -162,6 +175,13 @@ public class NodeManager extends CompositeService implements
// so that we make sure everything is up before registering with RM. // so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater); addService(nodeStatusUpdater);
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
super.init(conf); super.init(conf);
// TODO add local dirs to del // TODO add local dirs to del
} }
@ -178,10 +198,45 @@ public class NodeManager extends CompositeService implements
@Override @Override
public void stop() { public void stop() {
cleanupContainers();
super.stop(); super.stop();
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
} }
@SuppressWarnings("unchecked")
private void cleanupContainers() {
Map<ContainerId, Container> containers = context.getContainers();
if (containers.isEmpty()) {
return;
}
LOG.info("Containers still running on shutdown: " + containers.keySet());
List<ContainerId> containerIds = new ArrayList<ContainerId>(containers.keySet());
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containerIds,
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for containers to be killed");
long waitStartTime = System.currentTimeMillis();
while (!containers.isEmpty() &&
System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill", ex);
}
}
// All containers killed
if (containers.isEmpty()) {
LOG.info("All containers in DONE state");
} else {
LOG.info("Done waiting for containers to be killed. Still alive: " +
containers.keySet());
}
}
public static class NMContext implements Context { public static class NMContext implements Context {
private final NodeId nodeId = Records.newRecord(NodeId.class); private final NodeId nodeId = Records.newRecord(NodeId.class);
@ -283,6 +338,11 @@ public class NodeManager extends CompositeService implements
return new NodeManager(); return new NodeManager();
} }
// For testing
ContainerManagerImpl getContainerManager() {
return containerManager;
}
public static void main(String[] args) { public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);

View File

@ -363,7 +363,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
.getContainersToCleanupList(); .getContainersToCleanupList();
if (containersToCleanup.size() != 0) { if (containersToCleanup.size() != 0) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup)); new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
} }
List<ApplicationId> appsToCleanup = List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanupList(); response.getApplicationsToCleanupList();

View File

@ -23,6 +23,8 @@ import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -593,9 +595,16 @@ public class ContainerManagerImpl extends CompositeService implements
(CMgrCompletedContainersEvent) event; (CMgrCompletedContainersEvent) event;
for (ContainerId container : containersFinishedEvent for (ContainerId container : containersFinishedEvent
.getContainersToCleanup()) { .getContainersToCleanup()) {
String diagnostic = "";
if (containersFinishedEvent.getReason() ==
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN) {
diagnostic = "Container Killed on Shutdown";
} else if (containersFinishedEvent.getReason() ==
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Container Killed by ResourceManager";
}
this.dispatcher.getEventHandler().handle( this.dispatcher.getEventHandler().handle(
new ContainerKillEvent(container, new ContainerKillEvent(container, diagnostic));
"Container Killed by ResourceManager"));
} }
break; break;
default: default:

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
/**
* This class allows a node manager to run without without communicating with a
* real RM.
*/
public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
static final Log LOG = LogFactory.getLog(MockNodeStatusUpdater.class);
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private ResourceTracker resourceTracker;
public MockNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
resourceTracker = new MockResourceTracker();
}
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
private static class MockResourceTracker implements ResourceTracker {
private int heartBeatID;
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
RegistrationResponse regResponse = recordFactory
.newRecordInstance(RegistrationResponse.class);
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
response.setRegistrationResponse(regResponse);
return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID);
nodeStatus.setResponseId(heartBeatID++);
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
return nhResponse;
}
}
}

View File

@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestNodeManagerShutdown {
static final File basedir =
new File("target", TestNodeManagerShutdown.class.getName());
static final File tmpDir = new File(basedir, "tmpDir");
static final File logsDir = new File(basedir, "logs");
static final File remoteLogsDir = new File(basedir, "remotelogs");
static final File nmLocalDir = new File(basedir, "nm0");
static final File processStartFile = new File(tmpDir, "start_file.txt")
.getAbsoluteFile();
static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
static final String user = "nobody";
private FileContext localFS;
@Before
public void setup() throws UnsupportedFileSystemException {
localFS = FileContext.getLocalFSFileContext();
tmpDir.mkdirs();
logsDir.mkdirs();
remoteLogsDir.mkdirs();
nmLocalDir.mkdirs();
}
@After
public void tearDown() throws IOException, InterruptedException {
localFS.delete(new Path(basedir.getPath()), true);
}
@Test
public void testKillContainersOnShutdown() throws IOException {
NodeManager nm = getNodeManager();
nm.init(createNMConfig());
nm.start();
ContainerManagerImpl containerManager = nm.getContainerManager();
File scriptFile = createUnhaltingScriptFile();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// Construct the Container-id
ContainerId cId = createContainerId();
containerLaunchContext.setContainerId(cId);
containerLaunchContext.setUser(user);
URL localResourceUri =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource localResource =
recordFactory.newRecordInstance(LocalResource.class);
localResource.setResource(localResourceUri);
localResource.setSize(-1);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResource.setType(LocalResourceType.FILE);
localResource.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, localResource);
containerLaunchContext.setLocalResources(localResources);
containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>();
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
containerLaunchContext.setResource(recordFactory
.newRecordInstance(Resource.class));
containerLaunchContext.getResource().setMemory(1024);
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
containerManager.startContainer(startRequest);
GetContainerStatusRequest request =
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
request.setContainerId(cId);
ContainerStatus containerStatus =
containerManager.getContainerStatus(request).getStatus();
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
try {Thread.sleep(5000);} catch (InterruptedException ex) {ex.printStackTrace();}
nm.stop();
// Now verify the contents of the file
// Script generates a message when it receives a sigterm
// so we look for that
BufferedReader reader =
new BufferedReader(new FileReader(processStartFile));
boolean foundSigTermMessage = false;
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
if (line.contains("SIGTERM")) {
foundSigTermMessage = true;
break;
}
}
Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
reader.close();
}
private ContainerId createContainerId() {
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(0);
appId.setId(0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId containerId =
recordFactory.newRecordInstance(ContainerId.class);
containerId.setApplicationAttemptId(appAttemptId);
return containerId;
}
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
return conf;
}
/**
* Creates a script to run a container that will run forever unless
* stopped by external means.
*/
private File createUnhaltingScriptFile() throws IOException {
File scriptFile = new File(tmpDir, "scriptFile.sh");
BufferedWriter fileWriter = new BufferedWriter(new FileWriter(scriptFile));
fileWriter.write("#!/bin/bash\n\n");
fileWriter.write("echo \"Running testscript for delayed kill\"\n");
fileWriter.write("hello=\"Got SIGTERM\"\n");
fileWriter.write("umask 0\n");
fileWriter.write("trap \"echo $hello >> " + processStartFile + "\" SIGTERM\n");
fileWriter.write("echo \"Writing pid to start file\"\n");
fileWriter.write("echo $$ >> " + processStartFile + "\n");
fileWriter.write("while true; do\nsleep 1s;\ndone\n");
fileWriter.close();
return scriptFile;
}
private NodeManager getNodeManager() {
return new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
context, dispatcher, healthChecker, metrics);
return myNodeStatusUpdater;
}
};
}
}