diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 47f689c67be..2183d3f9bf1 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -206,6 +206,9 @@ Release 2.8.0 - UNRELEASED
yarn.scheduler.capacity.node-locality-delay in code and default xml file.
(Nijel SF via vinodkv)
+ YARN-2331. Distinguish shutdown during supervision vs. shutdown for
+ rolling upgrade. (Jason Lowe via xgong)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5291ff22ded..0851f3c554e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1158,6 +1158,10 @@ public class YarnConfiguration extends Configuration {
public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";
+ public static final String NM_RECOVERY_SUPERVISED =
+ NM_RECOVERY_PREFIX + "supervised";
+ public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;
+
////////////////////////////////
// Web Proxy Configs
////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e1e0ebd3d77..4d74f7622f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1192,6 +1192,15 @@
${hadoop.tmp.dir}/yarn-nm-recovery
+
+ Whether the nodemanager is running under supervision. A
+ nodemanager that supports recovery and is running under supervision
+ will not try to cleanup containers as it exits with the assumption
+ it will be immediately be restarted and recover containers.
+ yarn.nodemanager.recovery.supervised
+ false
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index c48df64bd9f..494fa8fbd88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -530,8 +530,11 @@ public class ContainerManagerImpl extends CompositeService implements
if (this.context.getNMStateStore().canRecover()
&& !this.context.getDecommissioned()) {
- // do not cleanup apps as they can be recovered on restart
- return;
+ if (getConfig().getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED,
+ YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED)) {
+ // do not cleanup apps as they can be recovered on restart
+ return;
+ }
}
List appIds =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 0018d564214..dbbfcd5deb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -145,10 +145,13 @@ public class LogAggregationService extends AbstractService implements
private void stopAggregators() {
threadPool.shutdown();
+ boolean supervised = getConfig().getBoolean(
+ YarnConfiguration.NM_RECOVERY_SUPERVISED,
+ YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED);
// if recovery on restart is supported then leave outstanding aggregations
// to the next restart
boolean shouldAbort = context.getNMStateStore().canRecover()
- && !context.getDecommissioned();
+ && !context.getDecommissioned() && supervised;
// politely ask to finish
for (AppLogAggregator aggregator : appLogAggregators.values()) {
if (shouldAbort) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index c45ffbb93dd..781950e08d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -22,7 +22,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
@@ -68,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -82,27 +87,18 @@ public class TestContainerManagerRecovery {
public void testApplicationRecovery() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
- Context context = new NMContext(new NMContainerTokenSecretManager(
- conf), new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), stateStore);
+ Context context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context);
cm.init(conf);
cm.start();
- // simulate registration with RM
- MasterKey masterKey = new MasterKeyPBImpl();
- masterKey.setKeyId(123);
- masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
- .byteValue() }));
- context.getContainerTokenSecretManager().setMasterKey(masterKey);
- context.getNMTokenSecretManager().setMasterKey(masterKey);
-
// add an application by starting a container
String appUser = "app_user1";
String modUser = "modify_user1";
@@ -155,9 +151,7 @@ public class TestContainerManagerRecovery {
// reset container manager and verify app recovered with proper acls
cm.stop();
- context = new NMContext(new NMContainerTokenSecretManager(
- conf), new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), stateStore);
+ context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
@@ -201,9 +195,7 @@ public class TestContainerManagerRecovery {
// restart and verify app is marked for finishing
cm.stop();
- context = new NMContext(new NMContainerTokenSecretManager(
- conf), new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), stateStore);
+ context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
@@ -233,9 +225,7 @@ public class TestContainerManagerRecovery {
// restart and verify app is no longer present after recovery
cm.stop();
- context = new NMContext(new NMContainerTokenSecretManager(
- conf), new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), stateStore);
+ context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
@@ -243,6 +233,95 @@ public class TestContainerManagerRecovery {
cm.stop();
}
+ @Test
+ public void testContainerCleanupOnShutdown() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+ Map localResources = Collections.emptyMap();
+ Map containerEnv = Collections.emptyMap();
+ List containerCmds = Collections.emptyList();
+ Map serviceData = Collections.emptyMap();
+ Credentials containerCreds = new Credentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ containerCreds.writeTokenStorageToStream(dob);
+ ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
+ dob.getLength());
+ Map acls = Collections.emptyMap();
+ ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+ localResources, containerEnv, containerCmds, serviceData,
+ containerTokens, acls);
+ // create the logAggregationContext
+ LogAggregationContext logAggregationContext =
+ LogAggregationContext.newInstance("includePattern", "excludePattern");
+
+ // verify containers are stopped on shutdown without recovery
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
+ conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
+ Context context = createContext(conf, new NMNullStateStoreService());
+ ContainerManagerImpl cm = spy(createContainerManager(context));
+ cm.init(conf);
+ cm.start();
+ StartContainersResponse startResponse = startContainer(context, cm, cid,
+ clc, logAggregationContext);
+ assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
+ cm.stop();
+ verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
+
+ // verify containers are stopped on shutdown with unsupervised recovery
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
+ NMMemoryStateStoreService memStore = new NMMemoryStateStoreService();
+ memStore.init(conf);
+ memStore.start();
+ context = createContext(conf, memStore);
+ cm = spy(createContainerManager(context));
+ cm.init(conf);
+ cm.start();
+ startResponse = startContainer(context, cm, cid,
+ clc, logAggregationContext);
+ assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
+ cm.stop();
+ memStore.close();
+ verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
+
+ // verify containers are not stopped on shutdown with supervised recovery
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+ memStore = new NMMemoryStateStoreService();
+ memStore.init(conf);
+ memStore.start();
+ context = createContext(conf, memStore);
+ cm = spy(createContainerManager(context));
+ cm.init(conf);
+ cm.start();
+ startResponse = startContainer(context, cm, cid,
+ clc, logAggregationContext);
+ assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
+ cm.stop();
+ memStore.close();
+ verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
+ }
+
+ private NMContext createContext(YarnConfiguration conf,
+ NMStateStoreService stateStore) {
+ NMContext context = new NMContext(new NMContainerTokenSecretManager(
+ conf), new NMTokenSecretManagerInNM(), null,
+ new ApplicationACLsManager(conf), stateStore);
+
+ // simulate registration with RM
+ MasterKey masterKey = new MasterKeyPBImpl();
+ masterKey.setKeyId(123);
+ masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
+ .byteValue() }));
+ context.getContainerTokenSecretManager().setMasterKey(masterKey);
+ context.getNMTokenSecretManager().setMasterKey(masterKey);
+ return context;
+ }
+
private StartContainersResponse startContainer(Context context,
final ContainerManagerImpl cm, ContainerId cid,
ContainerLaunchContext clc, LogAggregationContext logAggregationContext)