From c2febdcbaa12078db42403fe8fd74180fb58a84b Mon Sep 17 00:00:00 2001 From: Junping Du Date: Tue, 12 Aug 2014 10:56:13 +0000 Subject: [PATCH] YARN-1337. Recover containers upon nodemanager restart. (Contributed by Jason Lowe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617448 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../server/nodemanager/ContainerExecutor.java | 73 ++++++- .../nodemanager/DefaultContainerExecutor.java | 59 +++++- .../nodemanager/LinuxContainerExecutor.java | 7 + .../server/nodemanager/NodeStatusUpdater.java | 23 +++ .../nodemanager/NodeStatusUpdaterImpl.java | 34 ++-- .../ContainerManagerImpl.java | 167 ++++++++++++---- .../container/ContainerImpl.java | 165 ++++++++++++---- .../launcher/ContainerLaunch.java | 46 +++-- .../launcher/ContainersLauncher.java | 15 +- .../launcher/ContainersLauncherEventType.java | 1 + .../launcher/RecoveredContainerLaunch.java | 126 ++++++++++++ .../logaggregation/AppLogAggregator.java | 2 + .../logaggregation/AppLogAggregatorImpl.java | 14 +- .../logaggregation/LogAggregationService.java | 10 +- .../recovery/NMLeveldbStateStoreService.java | 167 ++++++++++++++++ .../recovery/NMNullStateStoreService.java | 38 ++++ .../recovery/NMStateStoreService.java | 185 ++++++++++++++++++ .../impl/container-executor.c | 99 ++++++++++ .../nodemanager/TestNodeStatusUpdater.java | 13 +- .../BaseContainerManagerTest.java | 2 +- .../containermanager/TestAuxServices.java | 3 +- .../TestContainerManagerRecovery.java | 1 + .../container/TestContainer.java | 3 +- .../recovery/NMMemoryStateStoreService.java | 76 +++++++ .../TestNMLeveldbStateStoreService.java | 124 ++++++++++++ .../nodemanager/webapp/TestNMWebServer.java | 2 +- .../resourcemanager/rmnode/RMNodeImpl.java | 38 +--- .../scheduler/SchedulerUtils.java | 7 +- .../scheduler/capacity/CapacityScheduler.java | 5 +- .../TestResourceTrackerService.java | 2 +- 31 files changed, 1341 insertions(+), 169 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index daa5197f893..f583143f060 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -38,6 +38,9 @@ Release 2.6.0 - UNRELEASED YARN-1354. Recover applications upon nodemanager restart. (Jason Lowe via junping_du) + YARN-1337. Recover containers upon nodemanager restart. (Jason Lowe via + junping_du) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index ee72fbc6647..7391872f680 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -29,17 +30,18 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; @@ -126,9 +128,76 @@ public abstract class ContainerExecutor implements Configurable { public abstract void deleteAsUser(String user, Path subDir, Path... basedirs) throws IOException, InterruptedException; + public abstract boolean isContainerProcessAlive(String user, String pid) + throws IOException; + + /** + * Recover an already existing container. This is a blocking call and returns + * only when the container exits. Note that the container must have been + * activated prior to this call. + * @param user the user of the container + * @param containerId The ID of the container to reacquire + * @return The exit code of the pre-existing container + * @throws IOException + */ + public int reacquireContainer(String user, ContainerId containerId) + throws IOException { + Path pidPath = getPidFilePath(containerId); + if (pidPath == null) { + LOG.warn(containerId + " is not active, returning terminated error"); + return ExitCode.TERMINATED.getExitCode(); + } + + String pid = null; + pid = ProcessIdFileReader.getProcessId(pidPath); + if (pid == null) { + throw new IOException("Unable to determine pid for " + containerId); + } + + LOG.info("Reacquiring " + containerId + " with pid " + pid); + try { + while(isContainerProcessAlive(user, pid)) { + Thread.sleep(1000); + } + } catch (InterruptedException e) { + throw new IOException("Interrupted while waiting for process " + pid + + " to exit", e); + } + + // wait for exit code file to appear + String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString()); + File file = new File(exitCodeFile); + final int sleepMsec = 100; + int msecLeft = 2000; + while (!file.exists() && msecLeft >= 0) { + if (!isContainerActive(containerId)) { + LOG.info(containerId + " was deactivated"); + return ExitCode.TERMINATED.getExitCode(); + } + try { + Thread.sleep(sleepMsec); + } catch (InterruptedException e) { + throw new IOException( + "Interrupted while waiting for exit code from " + containerId, e); + } + msecLeft -= sleepMsec; + } + if (msecLeft < 0) { + throw new IOException("Timeout while waiting for exit code from " + + containerId); + } + + try { + return Integer.parseInt(FileUtils.readFileToString(file).trim()); + } catch (NumberFormatException e) { + throw new IOException("Error parsing exit code from pid " + pid, e); + } + } + public enum ExitCode { FORCE_KILLED(137), - TERMINATED(143); + TERMINATED(143), + LOST(154); private final int code; private ExitCode(int exitCode) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index d45371ad0d4..a7af1c59b6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -273,25 +273,57 @@ public class DefaultContainerExecutor extends ContainerExecutor { private final class UnixLocalWrapperScriptBuilder extends LocalWrapperScriptBuilder { + private final Path sessionScriptPath; public UnixLocalWrapperScriptBuilder(Path containerWorkDir) { super(containerWorkDir); + this.sessionScriptPath = new Path(containerWorkDir, + Shell.appendScriptExtension("default_container_executor_session")); + } + + @Override + public void writeLocalWrapperScript(Path launchDst, Path pidFile) + throws IOException { + writeSessionScript(launchDst, pidFile); + super.writeLocalWrapperScript(launchDst, pidFile); } @Override public void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { - - // We need to do a move as writing to a file is not atomic - // Process reading a file being written to may get garbled data - // hence write pid to tmp file first followed by a mv + String exitCodeFile = ContainerLaunch.getExitCodeFile( + pidFile.toString()); + String tmpFile = exitCodeFile + ".tmp"; pout.println("#!/bin/bash"); - pout.println(); - pout.println("echo $$ > " + pidFile.toString() + ".tmp"); - pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); - String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; - pout.println(exec + " /bin/bash \"" + - launchDst.toUri().getPath().toString() + "\""); + pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\""); + pout.println("rc=$?"); + pout.println("echo $rc > \"" + tmpFile + "\""); + pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\""); + pout.println("exit $rc"); + } + + private void writeSessionScript(Path launchDst, Path pidFile) + throws IOException { + DataOutputStream out = null; + PrintStream pout = null; + try { + out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); + pout = new PrintStream(out); + // We need to do a move as writing to a file is not atomic + // Process reading a file being written to may get garbled data + // hence write pid to tmp file first followed by a mv + pout.println("#!/bin/bash"); + pout.println(); + pout.println("echo $$ > " + pidFile.toString() + ".tmp"); + pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); + String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; + pout.println(exec + " /bin/bash \"" + + launchDst.toUri().getPath().toString() + "\""); + } finally { + IOUtils.cleanup(LOG, pout, out); + } + lfs.setPermission(sessionScriptPath, + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); } } @@ -310,6 +342,7 @@ public class DefaultContainerExecutor extends ContainerExecutor { @Override public void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { + // TODO: exit code script for Windows // On Windows, the pid is the container ID, so that it can also serve as // the name of the job object created by winutils for task management. @@ -342,6 +375,12 @@ public class DefaultContainerExecutor extends ContainerExecutor { return true; } + @Override + public boolean isContainerProcessAlive(String user, String pid) + throws IOException { + return containerIsAlive(pid); + } + /** * Returns true if the process with the specified pid is alive. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index e03562678c7..7962da28c8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -403,6 +403,13 @@ public class LinuxContainerExecutor extends ContainerExecutor { } } + @Override + public boolean isContainerProcessAlive(String user, String pid) + throws IOException { + // Send a test signal to the process as the user to see if it's alive + return signalContainer(user, pid, Signal.NULL); + } + public void mountCgroups(List cgroupKVs, String hierarchy) throws IOException { List command = new ArrayList( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index 2439450c9e3..f0b99ecf342 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -23,11 +23,34 @@ import org.apache.hadoop.yarn.api.records.ContainerId; public interface NodeStatusUpdater extends Service { + /** + * Schedule a heartbeat to the ResourceManager outside of the normal, + * periodic heartbeating process. This is typically called when the state + * of containers on the node has changed to notify the RM sooner. + */ void sendOutofBandHeartBeat(); + /** + * Get the ResourceManager identifier received during registration + * @return the ResourceManager ID + */ long getRMIdentifier(); + /** + * Query if a container has recently completed + * @param containerId the container ID + * @return true if the container has recently completed + */ public boolean isContainerRecentlyStopped(ContainerId containerId); + /** + * Add a container to the list of containers that have recently completed + * @param containerId the ID of the completed container + */ + public void addCompletedContainer(ContainerId containerId); + + /** + * Clear the list of recently completed containers + */ public void clearFinishedContainersFromCache(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 0b8f5b4277a..b52b0fbf6e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -364,8 +364,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - updateStoppedContainersInCache(container.getContainerId()); - addCompletedContainer(container); + addCompletedContainer(container.getContainerId()); } } if (LOG.isDebugEnabled()) { @@ -393,8 +392,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - updateStoppedContainersInCache(container.getContainerId()); - addCompletedContainer(container); + addCompletedContainer(container.getContainerId()); } } LOG.info("Sending out " + containerStatuses.size() @@ -402,9 +400,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return containerStatuses; } - private void addCompletedContainer(Container container) { + @Override + public void addCompletedContainer(ContainerId containerId) { synchronized (previousCompletedContainers) { - previousCompletedContainers.add(container.getContainerId()); + previousCompletedContainers.add(containerId); + } + synchronized (recentlyStoppedContainers) { + removeVeryOldStoppedContainersFromCache(); + recentlyStoppedContainers.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); } } @@ -451,16 +455,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } - @Private - @VisibleForTesting - public void updateStoppedContainersInCache(ContainerId containerId) { - synchronized (recentlyStoppedContainers) { - removeVeryOldStoppedContainersFromCache(); - recentlyStoppedContainers.put(containerId, - System.currentTimeMillis() + durationToTrackStoppedContainers); - } - } - @Override public void clearFinishedContainersFromCache() { synchronized (recentlyStoppedContainers) { @@ -476,8 +470,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements Iterator i = recentlyStoppedContainers.keySet().iterator(); while (i.hasNext()) { - if (recentlyStoppedContainers.get(i.next()) < currentTime) { + ContainerId cid = i.next(); + if (recentlyStoppedContainers.get(cid) < currentTime) { i.remove(); + try { + context.getNMStateStore().removeContainer(cid); + } catch (IOException e) { + LOG.error("Unable to remove container " + cid + " in store", e); + } } else { break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index aee4e9a78a0..12166e0fa65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -127,6 +127,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -246,6 +248,10 @@ public class ContainerManagerImpl extends CompositeService implements recoverApplication(proto); } + for (RecoveredContainerState rcs : stateStore.loadContainersState()) { + recoverContainer(rcs); + } + String diagnostic = "Application marked finished during recovery"; for (ApplicationId appId : appsState.getFinishedApplications()) { dispatcher.getEventHandler().handle( @@ -276,6 +282,60 @@ public class ContainerManagerImpl extends CompositeService implements app.handle(new ApplicationInitEvent(appId, acls)); } + @SuppressWarnings("unchecked") + private void recoverContainer(RecoveredContainerState rcs) + throws IOException { + StartContainerRequest req = rcs.getStartRequest(); + ContainerLaunchContext launchContext = req.getContainerLaunchContext(); + ContainerTokenIdentifier token = + BuilderUtils.newContainerTokenIdentifier(req.getContainerToken()); + ContainerId containerId = token.getContainerID(); + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + + LOG.info("Recovering " + containerId + " in state " + rcs.getStatus() + + " with exit code " + rcs.getExitCode()); + + if (context.getApplications().containsKey(appId)) { + Credentials credentials = parseCredentials(launchContext); + Container container = new ContainerImpl(getConfig(), dispatcher, + context.getNMStateStore(), req.getContainerLaunchContext(), + credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), + rcs.getDiagnostics(), rcs.getKilled()); + context.getContainers().put(containerId, container); + dispatcher.getEventHandler().handle( + new ApplicationContainerInitEvent(container)); + } else { + if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { + LOG.warn(containerId + " has no corresponding application!"); + } + LOG.info("Adding " + containerId + " to recently stopped containers"); + nodeStatusUpdater.addCompletedContainer(containerId); + } + } + + private void waitForRecoveredContainers() throws InterruptedException { + final int sleepMsec = 100; + int waitIterations = 100; + List newContainers = new ArrayList(); + while (--waitIterations >= 0) { + newContainers.clear(); + for (Container container : context.getContainers().values()) { + if (container.getContainerState() == org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.NEW) { + newContainers.add(container.getContainerId()); + } + } + if (newContainers.isEmpty()) { + break; + } + LOG.info("Waiting for containers: " + newContainers); + Thread.sleep(sleepMsec); + } + if (waitIterations < 0) { + LOG.warn("Timeout waiting for recovered containers"); + } + } + protected LogHandler createLogHandler(Configuration conf, Context context, DeletionService deletionService) { if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, @@ -309,6 +369,23 @@ public class ContainerManagerImpl extends CompositeService implements // Enqueue user dirs in deletion context Configuration conf = getConfig(); + final InetSocketAddress initialAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_ADDRESS, + YarnConfiguration.DEFAULT_NM_ADDRESS, + YarnConfiguration.DEFAULT_NM_PORT); + boolean usingEphemeralPort = (initialAddress.getPort() == 0); + if (context.getNMStateStore().canRecover() && usingEphemeralPort) { + throw new IllegalArgumentException("Cannot support recovery with an " + + "ephemeral server port. Check the setting of " + + YarnConfiguration.NM_ADDRESS); + } + // If recovering then delay opening the RPC service until the recovery + // of resources and containers have completed, otherwise requests from + // clients during recovery can interfere with the recovery process. + final boolean delayedRpcServerStart = + context.getNMStateStore().canRecover(); + Configuration serverConf = new Configuration(conf); // always enforce it to be token-based. @@ -318,12 +395,6 @@ public class ContainerManagerImpl extends CompositeService implements YarnRPC rpc = YarnRPC.create(conf); - InetSocketAddress initialAddress = conf.getSocketAddr( - YarnConfiguration.NM_BIND_HOST, - YarnConfiguration.NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_PORT); - server = rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, serverConf, this.context.getNMTokenSecretManager(), @@ -340,32 +411,61 @@ public class ContainerManagerImpl extends CompositeService implements LOG.info("Blocking new container-requests as container manager rpc" + " server is still starting."); this.setBlockNewContainerRequests(true); - server.start(); - - InetSocketAddress connectAddress; + String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST); String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS); - if (bindHost == null || bindHost.isEmpty() || - nmAddress == null || nmAddress.isEmpty()) { - connectAddress = NetUtils.getConnectAddress(server); - } else { - //a bind-host case with an address, to support overriding the first hostname - //found when querying for our hostname with the specified address, combine - //the specified address with the actual port listened on by the server - connectAddress = NetUtils.getConnectAddress( - new InetSocketAddress(nmAddress.split(":")[0], - server.getListenerAddress().getPort())); + String hostOverride = null; + if (bindHost != null && !bindHost.isEmpty() + && nmAddress != null && !nmAddress.isEmpty()) { + //a bind-host case with an address, to support overriding the first + //hostname found when querying for our hostname with the specified + //address, combine the specified address with the actual port listened + //on by the server + hostOverride = nmAddress.split(":")[0]; } - NodeId nodeId = NodeId.newInstance( - connectAddress.getAddress().getCanonicalHostName(), - connectAddress.getPort()); + // setup node ID + InetSocketAddress connectAddress; + if (delayedRpcServerStart) { + connectAddress = NetUtils.getConnectAddress(initialAddress); + } else { + server.start(); + connectAddress = NetUtils.getConnectAddress(server); + } + NodeId nodeId = buildNodeId(connectAddress, hostOverride); ((NodeManager.NMContext)context).setNodeId(nodeId); this.context.getNMTokenSecretManager().setNodeId(nodeId); this.context.getContainerTokenSecretManager().setNodeId(nodeId); + + // start remaining services + super.serviceStart(); + + if (delayedRpcServerStart) { + waitForRecoveredContainers(); + server.start(); + + // check that the node ID is as previously advertised + connectAddress = NetUtils.getConnectAddress(server); + NodeId serverNode = buildNodeId(connectAddress, hostOverride); + if (!serverNode.equals(nodeId)) { + throw new IOException("Node mismatch after server started, expected '" + + nodeId + "' but found '" + serverNode + "'"); + } + } + LOG.info("ContainerManager started at " + connectAddress); LOG.info("ContainerManager bound to " + initialAddress); - super.serviceStart(); + } + + private NodeId buildNodeId(InetSocketAddress connectAddress, + String hostOverride) { + if (hostOverride != null) { + connectAddress = NetUtils.getConnectAddress( + new InetSocketAddress(hostOverride, connectAddress.getPort())); + } + return NodeId.newInstance( + connectAddress.getAddress().getCanonicalHostName(), + connectAddress.getPort()); } void refreshServiceAcls(Configuration configuration, @@ -704,7 +804,8 @@ public class ContainerManagerImpl extends CompositeService implements Credentials credentials = parseCredentials(launchContext); Container container = - new ContainerImpl(getConfig(), this.dispatcher, launchContext, + new ContainerImpl(getConfig(), this.dispatcher, + context.getNMStateStore(), launchContext, credentials, metrics, containerTokenIdentifier); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); @@ -733,6 +834,7 @@ public class ContainerManagerImpl extends CompositeService implements new ApplicationInitEvent(applicationID, appAcls)); } + this.context.getNMStateStore().storeContainer(containerId, request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -780,7 +882,7 @@ public class ContainerManagerImpl extends CompositeService implements } private Credentials parseCredentials(ContainerLaunchContext launchContext) - throws YarnException { + throws IOException { Credentials credentials = new Credentials(); // //////////// Parse credentials ByteBuffer tokens = launchContext.getTokens(); @@ -789,15 +891,11 @@ public class ContainerManagerImpl extends CompositeService implements DataInputByteBuffer buf = new DataInputByteBuffer(); tokens.rewind(); buf.reset(tokens); - try { - credentials.readTokenStorageStream(buf); - if (LOG.isDebugEnabled()) { - for (Token tk : credentials.getAllTokens()) { - LOG.debug(tk.getService() + " = " + tk.toString()); - } + credentials.readTokenStorageStream(buf); + if (LOG.isDebugEnabled()) { + for (Token tk : credentials.getAllTokens()) { + LOG.debug(tk.getService() + " = " + tk.toString()); } - } catch (IOException e) { - throw RPCUtil.getRemoteException(e); } } // //////////// End of parsing credentials @@ -830,7 +928,7 @@ public class ContainerManagerImpl extends CompositeService implements @SuppressWarnings("unchecked") private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, - ContainerId containerID) throws YarnException { + ContainerId containerID) throws YarnException, IOException { String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); @@ -843,6 +941,7 @@ public class ContainerManagerImpl extends CompositeService implements + " is not handled by this NodeManager"); } } else { + context.getNMStateStore().storeContainerKilled(containerID); dispatcher.getEventHandler().handle( new ContainerKillEvent(containerID, ContainerExitStatus.KILLED_BY_APPMASTER, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c565cf92347..fa54ee19b9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; +import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -62,6 +63,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -75,6 +78,7 @@ public class ContainerImpl implements Container { private final Lock readLock; private final Lock writeLock; private final Dispatcher dispatcher; + private final NMStateStoreService stateStore; private final Credentials credentials; private final NodeManagerMetrics metrics; private final ContainerLaunchContext launchContext; @@ -101,12 +105,19 @@ public class ContainerImpl implements Container { private final List appRsrcs = new ArrayList(); + // whether container has been recovered after a restart + private RecoveredContainerStatus recoveredStatus = + RecoveredContainerStatus.REQUESTED; + // whether container was marked as killed after recovery + private boolean recoveredAsKilled = false; + public ContainerImpl(Configuration conf, Dispatcher dispatcher, - ContainerLaunchContext launchContext, Credentials creds, - NodeManagerMetrics metrics, + NMStateStoreService stateStore, ContainerLaunchContext launchContext, + Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier) { this.daemonConf = conf; this.dispatcher = dispatcher; + this.stateStore = stateStore; this.launchContext = launchContext; this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); @@ -122,6 +133,21 @@ public class ContainerImpl implements Container { stateMachine = stateMachineFactory.make(this); } + // constructor for a recovered container + public ContainerImpl(Configuration conf, Dispatcher dispatcher, + NMStateStoreService stateStore, ContainerLaunchContext launchContext, + Credentials creds, NodeManagerMetrics metrics, + ContainerTokenIdentifier containerTokenIdentifier, + RecoveredContainerStatus recoveredStatus, int exitCode, + String diagnostics, boolean wasKilled) { + this(conf, dispatcher, stateStore, launchContext, creds, metrics, + containerTokenIdentifier); + this.recoveredStatus = recoveredStatus; + this.exitCode = exitCode; + this.recoveredAsKilled = wasKilled; + this.diagnostics.append(diagnostics); + } + private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION = new ContainerDoneTransition(); @@ -135,8 +161,10 @@ public class ContainerImpl implements Container { new StateMachineFactory(ContainerState.NEW) // From NEW State .addTransition(ContainerState.NEW, - EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED, - ContainerState.LOCALIZATION_FAILED), + EnumSet.of(ContainerState.LOCALIZING, + ContainerState.LOCALIZED, + ContainerState.LOCALIZATION_FAILED, + ContainerState.DONE), ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition()) .addTransition(ContainerState.NEW, ContainerState.NEW, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, @@ -281,7 +309,9 @@ public class ContainerImpl implements Container { UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, @@ -295,7 +325,9 @@ public class ContainerImpl implements Container { // we notify container of failed localization if localizer thread (for // that container) fails for some reason .addTransition(ContainerState.DONE, ContainerState.DONE, - ContainerEventType.RESOURCE_FAILED) + EnumSet.of(ContainerEventType.RESOURCE_FAILED, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) // create the topology tables .installTopology(); @@ -420,7 +452,7 @@ public class ContainerImpl implements Container { } } - @SuppressWarnings({"fallthrough", "unchecked"}) + @SuppressWarnings("fallthrough") private void finished() { ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); @@ -458,7 +490,11 @@ public class ContainerImpl implements Container { } metrics.releaseContainer(this.resource); + sendFinishedEvents(); + } + @SuppressWarnings("unchecked") + private void sendFinishedEvents() { // Inform the application @SuppressWarnings("rawtypes") EventHandler eventHandler = dispatcher.getEventHandler(); @@ -470,6 +506,45 @@ public class ContainerImpl implements Container { containerId, exitCode)); } + @SuppressWarnings("unchecked") // dispatcher not typed + private void sendLaunchEvent() { + ContainersLauncherEventType launcherEvent = + ContainersLauncherEventType.LAUNCH_CONTAINER; + if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { + // try to recover a container that was previously launched + launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; + } + dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(this, launcherEvent)); + } + + // Inform the ContainersMonitor to start monitoring the container's + // resource usage. + @SuppressWarnings("unchecked") // dispatcher not typed + private void sendContainerMonitorStartEvent() { + long pmemBytes = getResource().getMemory() * 1024 * 1024L; + float pmemRatio = daemonConf.getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + long vmemBytes = (long) (pmemRatio * pmemBytes); + + dispatcher.getEventHandler().handle( + new ContainerStartMonitoringEvent(containerId, + vmemBytes, pmemBytes)); + } + + private void addDiagnostics(String... diags) { + for (String s : diags) { + this.diagnostics.append(s); + } + try { + stateStore.storeContainerDiagnostics(containerId, diagnostics); + } catch (IOException e) { + LOG.warn("Unable to update diagnostics in state store for " + + containerId, e); + } + } + @SuppressWarnings("unchecked") // dispatcher not typed public void cleanup() { Map> rsrc = @@ -518,6 +593,16 @@ public class ContainerImpl implements Container { @Override public ContainerState transition(ContainerImpl container, ContainerEvent event) { + if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) { + container.sendFinishedEvents(); + return ContainerState.DONE; + } else if (container.recoveredAsKilled && + container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { + // container was killed but never launched + container.finished(); + return ContainerState.DONE; + } + final ContainerLaunchContext ctxt = container.launchContext; container.metrics.initingContainer(); @@ -593,9 +678,7 @@ public class ContainerImpl implements Container { new ContainerLocalizationRequestEvent(container, req)); return ContainerState.LOCALIZING; } else { - container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.LAUNCH_CONTAINER)); + container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -606,7 +689,6 @@ public class ContainerImpl implements Container { * Transition when one of the requested resources for this container * has been successfully localized. */ - @SuppressWarnings("unchecked") // dispatcher not typed static class LocalizedTransition implements MultipleArcTransition { @Override @@ -626,9 +708,8 @@ public class ContainerImpl implements Container { if (!container.pendingResources.isEmpty()) { return ContainerState.LOCALIZING; } - container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.LAUNCH_CONTAINER)); + + container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -638,24 +719,22 @@ public class ContainerImpl implements Container { * Transition from LOCALIZED state to RUNNING state upon receiving * a CONTAINER_LAUNCHED event */ - @SuppressWarnings("unchecked") // dispatcher not typed static class LaunchTransition extends ContainerTransition { + @SuppressWarnings("unchecked") @Override public void transition(ContainerImpl container, ContainerEvent event) { - // Inform the ContainersMonitor to start monitoring the container's - // resource usage. - long pmemBytes = - container.getResource().getMemory() * 1024 * 1024L; - float pmemRatio = container.daemonConf.getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - long vmemBytes = (long) (pmemRatio * pmemBytes); - - container.dispatcher.getEventHandler().handle( - new ContainerStartMonitoringEvent(container.containerId, - vmemBytes, pmemBytes)); + container.sendContainerMonitorStartEvent(); container.metrics.runningContainer(); container.wasLaunched = true; + + if (container.recoveredAsKilled) { + LOG.info("Killing " + container.containerId + + " due to recovered as killed"); + container.addDiagnostics("Container recovered as killed.\n"); + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } } } @@ -707,8 +786,7 @@ public class ContainerImpl implements Container { ContainerExitEvent exitEvent = (ContainerExitEvent) event; container.exitCode = exitEvent.getExitCode(); if (exitEvent.getDiagnosticInfo() != null) { - container.diagnostics.append(exitEvent.getDiagnosticInfo()) - .append('\n'); + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } // TODO: Add containerWorkDir to the deletion service. @@ -735,7 +813,7 @@ public class ContainerImpl implements Container { @Override public void transition(ContainerImpl container, ContainerEvent event) { super.transition(container, event); - container.diagnostics.append("Killed by external signal\n"); + container.addDiagnostics("Killed by external signal\n"); } } @@ -750,9 +828,7 @@ public class ContainerImpl implements Container { ContainerResourceFailedEvent rsrcFailedEvent = (ContainerResourceFailedEvent) event; - container.diagnostics.append(rsrcFailedEvent.getDiagnosticMessage() - + "\n"); - + container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n"); // Inform the localizer to decrement reference counts and cleanup // resources. @@ -775,8 +851,8 @@ public class ContainerImpl implements Container { container.metrics.endInitingContainer(); ContainerKillEvent killEvent = (ContainerKillEvent) event; container.exitCode = killEvent.getContainerExitStatus(); - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); - container.diagnostics.append("Container is killed before being launched.\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); + container.addDiagnostics("Container is killed before being launched.\n"); } } @@ -817,7 +893,7 @@ public class ContainerImpl implements Container { new ContainersLauncherEvent(container, ContainersLauncherEventType.CLEANUP_CONTAINER)); ContainerKillEvent killEvent = (ContainerKillEvent) event; - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); container.exitCode = killEvent.getContainerExitStatus(); } } @@ -836,8 +912,7 @@ public class ContainerImpl implements Container { } if (exitEvent.getDiagnosticInfo() != null) { - container.diagnostics.append(exitEvent.getDiagnosticInfo()) - .append('\n'); + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } // The process/process-grp is killed. Decrement reference counts and @@ -877,8 +952,8 @@ public class ContainerImpl implements Container { public void transition(ContainerImpl container, ContainerEvent event) { ContainerKillEvent killEvent = (ContainerKillEvent) event; container.exitCode = killEvent.getContainerExitStatus(); - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); - container.diagnostics.append("Container is killed before being launched.\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); + container.addDiagnostics("Container is killed before being launched.\n"); super.transition(container, event); } } @@ -892,8 +967,14 @@ public class ContainerImpl implements Container { public void transition(ContainerImpl container, ContainerEvent event) { ContainerDiagnosticsUpdateEvent updateEvent = (ContainerDiagnosticsUpdateEvent) event; - container.diagnostics.append(updateEvent.getDiagnosticsUpdate()) - .append("\n"); + container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n"); + try { + container.stateStore.storeContainerDiagnostics(container.containerId, + container.diagnostics); + } catch (IOException e) { + LOG.warn("Unable to update state store diagnostics for " + + container.containerId, e); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index e252e35dfc8..cee6a40a558 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -87,22 +87,23 @@ public class ContainerLaunch implements Callable { public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; private static final String PID_FILE_NAME_FMT = "%s.pid"; + private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode"; - private final Dispatcher dispatcher; - private final ContainerExecutor exec; + protected final Dispatcher dispatcher; + protected final ContainerExecutor exec; private final Application app; - private final Container container; + protected final Container container; private final Configuration conf; private final Context context; private final ContainerManagerImpl containerManager; - private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); - private volatile AtomicBoolean completed = new AtomicBoolean(false); + protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); + protected AtomicBoolean completed = new AtomicBoolean(false); private long sleepDelayBeforeSigKill = 250; private long maxKillWaitTime = 2000; - private Path pidFilePath = null; + protected Path pidFilePath = null; private final LocalDirsHandlerService dirsHandler; @@ -223,14 +224,11 @@ public class ContainerLaunch implements Callable { + Path.SEPARATOR + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, false); - String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT, - containerIdStr); + String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); // pid file should be in nm private dir so that it is not // accessible by users - pidFilePath = dirsHandler.getLocalPathForWrite( - ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR - + pidFileSuffix); + pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath); List localDirs = dirsHandler.getLocalDirs(); List logDirs = dirsHandler.getLogDirs(); @@ -288,6 +286,7 @@ public class ContainerLaunch implements Callable { dispatcher.getEventHandler().handle(new ContainerEvent( containerID, ContainerEventType.CONTAINER_LAUNCHED)); + context.getNMStateStore().storeContainerLaunched(containerID); // Check if the container is signalled to be killed. if (!shouldLaunchContainer.compareAndSet(false, true)) { @@ -310,6 +309,11 @@ public class ContainerLaunch implements Callable { } finally { completed.set(true); exec.deactivateContainer(containerID); + try { + context.getNMStateStore().storeContainerCompleted(containerID, ret); + } catch (IOException e) { + LOG.error("Unable to set exit code for container " + containerID); + } } if (LOG.isDebugEnabled()) { @@ -342,6 +346,11 @@ public class ContainerLaunch implements Callable { ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); return 0; } + + protected String getPidFileSubpath(String appIdStr, String containerIdStr) { + return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR + + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr); + } /** * Cleanup the container. @@ -357,6 +366,13 @@ public class ContainerLaunch implements Callable { String containerIdStr = ConverterUtils.toString(containerId); LOG.info("Cleaning up container " + containerIdStr); + try { + context.getNMStateStore().storeContainerKilled(containerId); + } catch (IOException e) { + LOG.error("Unable to mark container " + containerId + + " killed in store", e); + } + // launch flag will be set to true if process already launched boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); if (!alreadyLaunched) { @@ -421,6 +437,7 @@ public class ContainerLaunch implements Callable { if (pidFilePath != null) { FileContext lfs = FileContext.getLocalFSFileContext(); lfs.delete(pidFilePath, false); + lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false); } } } @@ -479,6 +496,10 @@ public class ContainerLaunch implements Callable { + appIdStr; } + Context getContext() { + return context; + } + @VisibleForTesting static abstract class ShellScriptBuilder { public static ShellScriptBuilder create() { @@ -787,4 +808,7 @@ public class ContainerLaunch implements Callable { } } + public static String getExitCodeFile(String pidFile) { + return pidFile + EXIT_CODE_FILE_SUFFIX; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index ce865e3f68f..6950aa9381a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,21 +31,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -107,7 +101,6 @@ public class ContainersLauncher extends AbstractService super.serviceStop(); } - @SuppressWarnings("unchecked") @Override public void handle(ContainersLauncherEvent event) { // TODO: ContainersLauncher launches containers one by one!! @@ -125,6 +118,14 @@ public class ContainersLauncher extends AbstractService containerLauncher.submit(launch); running.put(containerId, launch); break; + case RECOVER_CONTAINER: + app = context.getApplications().get( + containerId.getApplicationAttemptId().getApplicationId()); + launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher, + exec, app, event.getContainer(), dirsHandler, containerManager); + containerLauncher.submit(launch); + running.put(containerId, launch); + break; case CLEANUP_CONTAINER: ContainerLaunch launcher = running.remove(containerId); if (launcher == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 6793bf75d20..385b5b2d9d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; public enum ContainersLauncherEventType { LAUNCH_CONTAINER, + RECOVER_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java new file mode 100644 index 00000000000..446695a974c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java @@ -0,0 +1,126 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; +import org.apache.hadoop.yarn.util.ConverterUtils; + +/** + * This is a ContainerLaunch which has been recovered after an NM restart (for + * rolling upgrades) + */ +public class RecoveredContainerLaunch extends ContainerLaunch { + + private static final Log LOG = LogFactory.getLog( + RecoveredContainerLaunch.class); + + public RecoveredContainerLaunch(Context context, Configuration configuration, + Dispatcher dispatcher, ContainerExecutor exec, Application app, + Container container, LocalDirsHandlerService dirsHandler, + ContainerManagerImpl containerManager) + { + super(context, configuration, dispatcher, exec, app, container, dirsHandler, + containerManager); + this.shouldLaunchContainer.set(true); + } + + /** + * Wait on the process specified in pid file and return its exit code + */ + @SuppressWarnings("unchecked") + @Override + public Integer call() { + int retCode = ExitCode.LOST.getExitCode(); + ContainerId containerId = container.getContainerId(); + String appIdStr = ConverterUtils.toString( + containerId.getApplicationAttemptId().getApplicationId()); + String containerIdStr = ConverterUtils.toString(containerId); + + dispatcher.getEventHandler().handle(new ContainerEvent(containerId, + ContainerEventType.CONTAINER_LAUNCHED)); + + try { + File pidFile = locatePidFile(appIdStr, containerIdStr); + if (pidFile != null) { + String pidPathStr = pidFile.getPath(); + pidFilePath = new Path(pidPathStr); + exec.activateContainer(containerId, pidFilePath); + retCode = exec.reacquireContainer(container.getUser(), containerId); + } else { + LOG.warn("Unable to locate pid file for container " + containerIdStr); + } + } catch (IOException e) { + LOG.error("Unable to recover container " + containerIdStr, e); + } finally { + this.completed.set(true); + exec.deactivateContainer(containerId); + try { + getContext().getNMStateStore().storeContainerCompleted(containerId, + retCode); + } catch (IOException e) { + LOG.error("Unable to set exit code for container " + containerId); + } + } + + if (retCode != 0) { + LOG.warn("Recovered container exited with a non-zero exit code " + + retCode); + this.dispatcher.getEventHandler().handle(new ContainerExitEvent( + containerId, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode, + "Container exited with a non-zero exit code " + retCode)); + return retCode; + } + + LOG.info("Recovered container " + containerId + " succeeded"); + dispatcher.getEventHandler().handle( + new ContainerEvent(containerId, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); + return 0; + } + + private File locatePidFile(String appIdStr, String containerIdStr) { + String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr); + for (String dir : getContext().getLocalDirsHandler().getLocalDirs()) { + File pidFile = new File(dir, pidSubpath); + if (pidFile.exists()) { + return pidFile; + } + } + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index deb6c045c83..0b72a39b682 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -25,5 +25,7 @@ public interface AppLogAggregator extends Runnable { void startContainerLogAggregation(ContainerId containerId, boolean wasContainerSuccessful); + void abortLogAggregation(); + void finishLogAggregation(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 805b9e0b1ba..1af48bbf1e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -70,6 +70,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final BlockingQueue pendingContainers; private final AtomicBoolean appFinishing = new AtomicBoolean(); private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); + private final AtomicBoolean aborted = new AtomicBoolean(); private final Map appAcls; private LogWriter writer = null; @@ -150,7 +151,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private void doAppLogAggregation() { ContainerId containerId; - while (!this.appFinishing.get()) { + while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { wait(THREAD_SLEEP_TIME); @@ -161,6 +162,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } + if (this.aborted.get()) { + return; + } + // Application is finished. Finish pending-containers while ((containerId = this.pendingContainers.poll()) != null) { uploadLogsForContainer(containerId); @@ -255,4 +260,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.appFinishing.set(true); this.notifyAll(); } + + @Override + public synchronized void abortLogAggregation() { + LOG.info("Aborting log aggregation for " + this.applicationId); + this.aborted.set(true); + this.notifyAll(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index efe8984694e..58e1837ebea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -142,9 +142,17 @@ public class LogAggregationService extends AbstractService implements private void stopAggregators() { threadPool.shutdown(); + // if recovery on restart is supported then leave outstanding aggregations + // to the next restart + boolean shouldAbort = context.getNMStateStore().canRecover() + && !context.getDecommissioned(); // politely ask to finish for (AppLogAggregator aggregator : appLogAggregators.values()) { - aggregator.finishLogAggregation(); + if (shouldAbort) { + aggregator.abortLogAggregation(); + } else { + aggregator.finishLogAggregation(); + } } while (!threadPool.isTerminated()) { // wait for all threads to finish for (ApplicationId appId : appLogAggregators.keySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index c3fc272d7f5..7c95fff9986 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.records.Version; @@ -90,6 +93,14 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/"; private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/"; + private static final String CONTAINERS_KEY_PREFIX = + "ContainerManager/containers/"; + private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; + private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; + private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; + private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; + private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; + private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/"; @@ -104,6 +115,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINER_TOKENS_PREV_MASTER_KEY = CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX; + private static final byte[] EMPTY_VALUE = new byte[0]; + private DB db; public NMLeveldbStateStoreService() { @@ -122,6 +135,160 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } + @Override + public List loadContainersState() + throws IOException { + ArrayList containers = + new ArrayList(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(CONTAINERS_KEY_PREFIX)); + + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { + break; + } + + int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length()); + if (idEndPos < 0) { + throw new IOException("Unable to determine container in key: " + key); + } + ContainerId containerId = ConverterUtils.toContainerId( + key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); + String keyPrefix = key.substring(0, idEndPos+1); + containers.add(loadContainerState(containerId, iter, keyPrefix)); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + + return containers; + } + + private RecoveredContainerState loadContainerState(ContainerId containerId, + LeveldbIterator iter, String keyPrefix) throws IOException { + RecoveredContainerState rcs = new RecoveredContainerState(); + rcs.status = RecoveredContainerStatus.REQUESTED; + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + iter.next(); + + String suffix = key.substring(keyPrefix.length()-1); // start with '/' + if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) { + rcs.startRequest = new StartContainerRequestPBImpl( + StartContainerRequestProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { + rcs.diagnostics = asString(entry.getValue()); + } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { + if (rcs.status == RecoveredContainerStatus.REQUESTED) { + rcs.status = RecoveredContainerStatus.LAUNCHED; + } + } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) { + rcs.killed = true; + } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { + rcs.status = RecoveredContainerStatus.COMPLETED; + rcs.exitCode = Integer.parseInt(asString(entry.getValue())); + } else { + throw new IOException("Unexpected container state key: " + key); + } + } + return rcs; + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_REQUEST_KEY_SUFFIX; + try { + db.put(bytes(key), + ((StartContainerRequestPBImpl) startRequest).getProto().toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_DIAGS_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(diagnostics.toString())); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_LAUNCHED_KEY_SUFFIX; + try { + db.put(bytes(key), EMPTY_VALUE); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_KILLED_KEY_SUFFIX; + try { + db.put(bytes(key), EMPTY_VALUE); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerCompleted(ContainerId containerId, + int exitCode) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_EXIT_CODE_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(Integer.toString(exitCode))); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeContainer(ContainerId containerId) + throws IOException { + String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString(); + try { + WriteBatch batch = db.createWriteBatch(); + try { + batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public RecoveredApplicationsState loadApplicationsState() throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 3bb1e2189fc..66469697987 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -62,6 +64,42 @@ public class NMNullStateStoreService extends NMStateStoreService { public void removeApplication(ApplicationId appId) throws IOException { } + @Override + public List loadContainersState() + throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + } + + @Override + public void storeContainerCompleted(ContainerId containerId, int exitCode) + throws IOException { + } + + @Override + public void removeContainer(ContainerId containerId) throws IOException { + } + @Override public RecoveredLocalizationState loadLocalizationState() throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index f0988e36172..a9699f370b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -29,8 +29,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -59,6 +61,40 @@ public abstract class NMStateStoreService extends AbstractService { } } + public enum RecoveredContainerStatus { + REQUESTED, + LAUNCHED, + COMPLETED + } + + public static class RecoveredContainerState { + RecoveredContainerStatus status; + int exitCode = ContainerExitStatus.INVALID; + boolean killed = false; + String diagnostics = ""; + StartContainerRequest startRequest; + + public RecoveredContainerStatus getStatus() { + return status; + } + + public int getExitCode() { + return exitCode; + } + + public boolean getKilled() { + return killed; + } + + public String getDiagnostics() { + return diagnostics; + } + + public StartContainerRequest getStartRequest() { + return startRequest; + } + } + public static class LocalResourceTrackerState { List localizedResources = new ArrayList(); @@ -176,19 +212,100 @@ public abstract class NMStateStoreService extends AbstractService { } + /** + * Load the state of applications + * @return recovered state for applications + * @throws IOException + */ public abstract RecoveredApplicationsState loadApplicationsState() throws IOException; + /** + * Record the start of an application + * @param appId the application ID + * @param p state to store for the application + * @throws IOException + */ public abstract void storeApplication(ApplicationId appId, ContainerManagerApplicationProto p) throws IOException; + /** + * Record that an application has finished + * @param appId the application ID + * @throws IOException + */ public abstract void storeFinishedApplication(ApplicationId appId) throws IOException; + /** + * Remove records corresponding to an application + * @param appId the application ID + * @throws IOException + */ public abstract void removeApplication(ApplicationId appId) throws IOException; + /** + * Load the state of containers + * @return recovered state for containers + * @throws IOException + */ + public abstract List loadContainersState() + throws IOException; + + /** + * Record a container start request + * @param containerId the container ID + * @param startRequest the container start request + * @throws IOException + */ + public abstract void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException; + + /** + * Record that a container has been launched + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerLaunched(ContainerId containerId) + throws IOException; + + /** + * Record that a container has completed + * @param containerId the container ID + * @param exitCode the exit code from the container + * @throws IOException + */ + public abstract void storeContainerCompleted(ContainerId containerId, + int exitCode) throws IOException; + + /** + * Record a request to kill a container + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerKilled(ContainerId containerId) + throws IOException; + + /** + * Record diagnostics for a container + * @param containerId the container ID + * @param diagnostics the container diagnostics + * @throws IOException + */ + public abstract void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException; + + /** + * Remove records corresponding to a container + * @param containerId the container ID + * @throws IOException + */ + public abstract void removeContainer(ContainerId containerId) + throws IOException; + + /** * Load the state of localized resources * @return recovered localized resource state @@ -230,43 +347,111 @@ public abstract class NMStateStoreService extends AbstractService { ApplicationId appId, Path localPath) throws IOException; + /** + * Load the state of the deletion service + * @return recovered deletion service state + * @throws IOException + */ public abstract RecoveredDeletionServiceState loadDeletionServiceState() throws IOException; + /** + * Record a deletion task + * @param taskId the deletion task ID + * @param taskProto the deletion task protobuf + * @throws IOException + */ public abstract void storeDeletionTask(int taskId, DeletionServiceDeleteTaskProto taskProto) throws IOException; + /** + * Remove records corresponding to a deletion task + * @param taskId the deletion task ID + * @throws IOException + */ public abstract void removeDeletionTask(int taskId) throws IOException; + /** + * Load the state of NM tokens + * @return recovered state of NM tokens + * @throws IOException + */ public abstract RecoveredNMTokensState loadNMTokensState() throws IOException; + /** + * Record the current NM token master key + * @param key the master key + * @throws IOException + */ public abstract void storeNMTokenCurrentMasterKey(MasterKey key) throws IOException; + /** + * Record the previous NM token master key + * @param key the previous master key + * @throws IOException + */ public abstract void storeNMTokenPreviousMasterKey(MasterKey key) throws IOException; + /** + * Record a master key corresponding to an application + * @param attempt the application attempt ID + * @param key the master key + * @throws IOException + */ public abstract void storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key) throws IOException; + /** + * Remove a master key corresponding to an application + * @param attempt the application attempt ID + * @throws IOException + */ public abstract void removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt) throws IOException; + /** + * Load the state of container tokens + * @return recovered state of container tokens + * @throws IOException + */ public abstract RecoveredContainerTokensState loadContainerTokensState() throws IOException; + /** + * Record the current container token master key + * @param key the master key + * @throws IOException + */ public abstract void storeContainerTokenCurrentMasterKey(MasterKey key) throws IOException; + /** + * Record the previous container token master key + * @param key the previous master key + * @throws IOException + */ public abstract void storeContainerTokenPreviousMasterKey(MasterKey key) throws IOException; + /** + * Record the expiration time for a container token + * @param containerId the container ID + * @param expirationTime the container token expiration time + * @throws IOException + */ public abstract void storeContainerToken(ContainerId containerId, Long expirationTime) throws IOException; + /** + * Remove records for a container token + * @param containerId the container ID + * @throws IOException + */ public abstract void removeContainerToken(ContainerId containerId) throws IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c index 16ede961edc..b64da19ae78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c @@ -33,6 +33,7 @@ #include #include #include +#include static const int DEFAULT_MIN_USERID = 1000; @@ -244,6 +245,85 @@ static int write_pid_to_file_as_nm(const char* pid_file, pid_t pid) { return 0; } +/** + * Write the exit code of the container into the exit code file + * exit_code_file: Path to exit code file where exit code needs to be written + */ +static int write_exit_code_file(const char* exit_code_file, int exit_code) { + char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1, + exit_code_file); + if (tmp_ecode_file == NULL) { + return -1; + } + + // create with 700 + int ecode_fd = open(tmp_ecode_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU); + if (ecode_fd == -1) { + fprintf(LOGFILE, "Can't open file %s - %s\n", tmp_ecode_file, + strerror(errno)); + free(tmp_ecode_file); + return -1; + } + + char ecode_buf[21]; + snprintf(ecode_buf, sizeof(ecode_buf), "%d", exit_code); + ssize_t written = write(ecode_fd, ecode_buf, strlen(ecode_buf)); + close(ecode_fd); + if (written == -1) { + fprintf(LOGFILE, "Failed to write exit code to file %s - %s\n", + tmp_ecode_file, strerror(errno)); + free(tmp_ecode_file); + return -1; + } + + // rename temp file to actual exit code file + // use rename as atomic + if (rename(tmp_ecode_file, exit_code_file)) { + fprintf(LOGFILE, "Can't move exit code file from %s to %s - %s\n", + tmp_ecode_file, exit_code_file, strerror(errno)); + unlink(tmp_ecode_file); + free(tmp_ecode_file); + return -1; + } + + free(tmp_ecode_file); + return 0; +} + +/** + * Wait for the container process to exit and write the exit code to + * the exit code file. + * Returns the exit code of the container process. + */ +static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) { + int child_status = -1; + int exit_code = -1; + int waitpid_result; + + if (change_effective_user(nm_uid, nm_gid) != 0) { + return -1; + } + do { + waitpid_result = waitpid(pid, &child_status, 0); + } while (waitpid_result == -1 && errno == EINTR); + if (waitpid_result < 0) { + fprintf(LOGFILE, "Error waiting for container process %d - %s\n", + pid, strerror(errno)); + return -1; + } + if (WIFEXITED(child_status)) { + exit_code = WEXITSTATUS(child_status); + } else if (WIFSIGNALED(child_status)) { + exit_code = 0x80 + WTERMSIG(child_status); + } else { + fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid); + } + if (write_exit_code_file(exit_code_file, exit_code) < 0) { + return -1; + } + return exit_code; +} + /** * Change the real and effective user and group to abandon the super user * priviledges. @@ -337,6 +417,10 @@ char *get_container_work_directory(const char *nm_root, const char *user, nm_root, user, app_id, container_id); } +char *get_exit_code_file(const char* pid_file) { + return concatenate("%s.exitcode", "exit_code_file", 1, pid_file); +} + char *get_container_launcher_file(const char* work_dir) { return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT); } @@ -879,6 +963,8 @@ int launch_container_as_user(const char *user, const char *app_id, int exit_code = -1; char *script_file_dest = NULL; char *cred_file_dest = NULL; + char *exit_code_file = NULL; + script_file_dest = get_container_launcher_file(work_dir); if (script_file_dest == NULL) { exit_code = OUT_OF_MEMORY; @@ -889,6 +975,11 @@ int launch_container_as_user(const char *user, const char *app_id, exit_code = OUT_OF_MEMORY; goto cleanup; } + exit_code_file = get_exit_code_file(pid_file); + if (NULL == exit_code_file) { + exit_code = OUT_OF_MEMORY; + goto cleanup; + } // open launch script int container_file_source = open_file_as_nm(script_name); @@ -902,6 +993,13 @@ int launch_container_as_user(const char *user, const char *app_id, goto cleanup; } + pid_t child_pid = fork(); + if (child_pid != 0) { + // parent + exit_code = wait_and_write_exit_code(child_pid, exit_code_file); + goto cleanup; + } + // setsid pid_t pid = setsid(); if (pid == -1) { @@ -986,6 +1084,7 @@ int launch_container_as_user(const char *user, const char *app_id, exit_code = 0; cleanup: + free(exit_code_file); free(script_file_dest); free(cred_file_dest); return exit_code; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index fecc837cfab..f2a3a4a8c0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -201,6 +201,7 @@ public class TestNodeStatusUpdater { Dispatcher mockDispatcher = mock(Dispatcher.class); EventHandler mockEventHandler = mock(EventHandler.class); when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler); + NMStateStoreService stateStore = new NMNullStateStoreService(); nodeStatus.setResponseId(heartBeatID++); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); @@ -226,9 +227,8 @@ public class TestNodeStatusUpdater { firstContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = - new ContainerImpl(conf, mockDispatcher, launchContext, null, - mockMetrics, containerToken); + Container container = new ContainerImpl(conf, mockDispatcher, + stateStore, launchContext, null, mockMetrics, containerToken); this.context.getContainers().put(firstContainerID, container); } else if (heartBeatID == 2) { // Checks on the RM end @@ -257,9 +257,8 @@ public class TestNodeStatusUpdater { secondContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = - new ContainerImpl(conf, mockDispatcher, launchContext, null, - mockMetrics, containerToken); + Container container = new ContainerImpl(conf, mockDispatcher, + stateStore, launchContext, null, mockMetrics, containerToken); this.context.getContainers().put(secondContainerID, container); } else if (heartBeatID == 3) { // Checks on the RM end @@ -784,7 +783,7 @@ public class TestNodeStatusUpdater { ContainerId cId = ContainerId.newInstance(appAttemptId, 0); - nodeStatusUpdater.updateStoppedContainersInCache(cId); + nodeStatusUpdater.addCompletedContainer(cId); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); long time1 = System.currentTimeMillis(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index fc92b36fd7e..1907e1a0c3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -233,7 +233,7 @@ public abstract class BaseContainerManagerTest { protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path[] baseDirs) { + public void delete(String user, Path subDir, Path... baseDirs) { // Don't do any deletions. LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir + ", baseDirs - " + baseDirs); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 48e028180a7..59cc947e3b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -191,7 +191,8 @@ public class TestAuxServices { ContainerTokenIdentifier cti = new ContainerTokenIdentifier( ContainerId.newInstance(attemptId, 1), "", "", Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0); - Container container = new ContainerImpl(null, null, null, null, null, cti); + Container container = new ContainerImpl(null, null, null, null, null, + null, cti); ContainerId containerId = container.getContainerId(); Resource resource = container.getResource(); event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 00eb2fe288b..0319664c398 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -80,6 +80,7 @@ public class TestContainerManagerRecovery { public void testApplicationRecovery() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user"); NMStateStoreService stateStore = new NMMemoryStateStoreService(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index e0239928221..a813e9806c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -780,7 +780,8 @@ public class TestContainer { } when(ctxt.getServiceData()).thenReturn(serviceData); - c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier); + c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), + ctxt, null, metrics, identifier); dispatcher.register(ContainerEventType.class, new EventHandler() { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 5b5e1ef5bb8..d4040915ef7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -22,13 +22,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; public class NMMemoryStateStoreService extends NMStateStoreService { private Map apps; private Set finishedApps; + private Map containerStates; private Map trackerStates; private Map deleteTasks; private RecoveredNMTokensState nmTokenState; @@ -53,6 +57,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { protected void initStorage(Configuration conf) { apps = new HashMap(); finishedApps = new HashSet(); + containerStates = new HashMap(); nmTokenState = new RecoveredNMTokensState(); nmTokenState.applicationMasterKeys = new HashMap(); @@ -100,6 +105,77 @@ public class NMMemoryStateStoreService extends NMStateStoreService { finishedApps.remove(appId); } + @Override + public List loadContainersState() + throws IOException { + // return a copy so caller can't modify our state + List result = + new ArrayList(containerStates.size()); + for (RecoveredContainerState rcs : containerStates.values()) { + RecoveredContainerState rcsCopy = new RecoveredContainerState(); + rcsCopy.status = rcs.status; + rcsCopy.exitCode = rcs.exitCode; + rcsCopy.killed = rcs.killed; + rcsCopy.diagnostics = rcs.diagnostics; + rcsCopy.startRequest = rcs.startRequest; + result.add(rcsCopy); + } + return new ArrayList(); + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + RecoveredContainerState rcs = new RecoveredContainerState(); + rcs.startRequest = startRequest; + containerStates.put(containerId, rcs); + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.diagnostics = diagnostics.toString(); + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + if (rcs.exitCode != ContainerExitStatus.INVALID) { + throw new IOException("Container already completed"); + } + rcs.status = RecoveredContainerStatus.LAUNCHED; + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.killed = true; + } + + @Override + public void storeContainerCompleted(ContainerId containerId, int exitCode) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.status = RecoveredContainerStatus.COMPLETED; + rcs.exitCode = exitCode; + } + + @Override + public void removeContainer(ContainerId containerId) throws IOException { + containerStates.remove(containerId); + } + + private RecoveredContainerState getRecoveredContainerState( + ContainerId containerId) throws IOException { + RecoveredContainerState rcs = containerStates.get(containerId); + if (rcs == null) { + throw new IOException("No start request for " + containerId); + } + return rcs; + } private LocalResourceTrackerState loadTrackerState(TrackerState ts) { LocalResourceTrackerState result = new LocalResourceTrackerState(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 84a69069a30..d2cc36323a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -25,18 +25,30 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -44,9 +56,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; @@ -192,6 +207,115 @@ public class TestNMLeveldbStateStoreService { assertEquals(appId1, state.getFinishedApplications().get(0)); } + @Test + public void testContainerStorage() throws IOException { + // test empty when no state + List recoveredContainers = + stateStore.loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + + // create a container request + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 4); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 5); + LocalResource lrsrc = LocalResource.newInstance( + URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, + 1234567890L); + Map localResources = + new HashMap(); + localResources.put("rsrc", lrsrc); + Map env = new HashMap(); + env.put("somevar", "someval"); + List containerCmds = new ArrayList(); + containerCmds.add("somecmd"); + containerCmds.add("somearg"); + Map serviceData = new HashMap(); + serviceData.put("someservice", + ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer containerTokens = + ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + Map acls = + new HashMap(); + acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); + acls.put(ApplicationAccessType.MODIFY_APP, "moduser"); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, env, containerCmds, serviceData, containerTokens, + acls); + Resource containerRsrc = Resource.newInstance(1357, 3); + ContainerTokenIdentifier containerTokenId = + new ContainerTokenIdentifier(containerId, "host", "user", + containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7), + 13579); + Token containerToken = Token.newInstance(containerTokenId.getBytes(), + ContainerTokenIdentifier.KIND.toString(), "password".getBytes(), + "tokenservice"); + StartContainerRequest containerReq = + StartContainerRequest.newInstance(clc, containerToken); + + // store a container and verify recovered + stateStore.storeContainer(containerId, containerReq); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertTrue(rcs.getDiagnostics().isEmpty()); + + // launch the container, add some diagnostics, and verify recovered + StringBuilder diags = new StringBuilder(); + stateStore.storeContainerLaunched(containerId); + diags.append("some diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // mark the container killed, add some more diags, and verify recovered + diags.append("some more diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerKilled(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // add yet more diags, mark container completed, and verify recovered + diags.append("some final diags"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerCompleted(containerId, 21); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); + assertEquals(21, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // remove the container and verify not recovered + stateStore.removeContainer(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + } + @Test public void testStartResourceLocalization() throws IOException { String user = "somebody"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index 4d6ac7788d5..a7006e0fb5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -209,7 +209,7 @@ public class TestNMWebServer { BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123, "password".getBytes(), currentTime); Container container = - new ContainerImpl(conf, dispatcher, launchContext, + new ContainerImpl(conf, dispatcher, stateStore, launchContext, null, metrics, BuilderUtils.newContainerTokenIdentifier(containerToken)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index cf81d727138..9ead898db40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -93,9 +93,9 @@ public class RMNodeImpl implements RMNode, EventHandler { private final RMContext context; private final String hostName; private final int commandPort; - private final int httpPort; + private int httpPort; private final String nodeAddress; // The containerManager address - private final String httpAddress; + private String httpAddress; private volatile ResourceOption resourceOption; private final Node node; @@ -521,37 +521,15 @@ public class RMNodeImpl implements RMNode, EventHandler { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - // Kill containers since node is rejoining. - rmNode.nodeUpdateQueue.clear(); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeRemovedSchedulerEvent(rmNode)); - RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); - if (rmNode.getTotalCapability().equals(newNode.getTotalCapability()) - && rmNode.getHttpPort() == newNode.getHttpPort()) { - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState() != NodeState.UNHEALTHY) { - // Only add new node if old state is not UNHEALTHY - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - } - } else { - // Reconnected node differs, so replace old node and start new node - switch (rmNode.getState()) { - case RUNNING: - ClusterMetrics.getMetrics().decrNumActiveNodes(); - break; - case UNHEALTHY: - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); - break; - } - rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); - rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null, null)); - } + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + rmNode.resourceOption = newNode.getResourceOption(); + + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 572a9f92e40..d3df93fcc6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -153,14 +153,17 @@ public class SchedulerUtils { * @param rmNode RMNode with new resource view * @param clusterResource the cluster's resource that need to update * @param log Scheduler's log for resource change + * @return true if the resources have changed */ - public static void updateResourceIfChanged(SchedulerNode node, + public static boolean updateResourceIfChanged(SchedulerNode node, RMNode rmNode, Resource clusterResource, Log log) { + boolean result = false; Resource oldAvailableResource = node.getAvailableResource(); Resource newAvailableResource = Resources.subtract( rmNode.getTotalCapability(), node.getUsedResource()); if (!newAvailableResource.equals(oldAvailableResource)) { + result = true; Resource deltaResource = Resources.subtract(newAvailableResource, oldAvailableResource); // Reflect resource change to scheduler node. @@ -176,6 +179,8 @@ public class SchedulerUtils { + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: " + deltaResource.getMemory() +"MB"); } + + return result; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 26812388aaf..65ff81c0216 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -783,7 +783,10 @@ public class CapacityScheduler extends FiCaSchedulerNode node = getNode(nm.getNodeID()); // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); + if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, + LOG)) { + root.updateClusterResource(clusterResource); + } List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4360aefb066..48276205bf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -595,7 +595,7 @@ public class TestResourceTrackerService { // reconnect of node with changed capability nm1 = rm.registerNode("host2:5678", 10240); dispatcher.await(); - response = nm2.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true); dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());