YARN-2331. Distinguish shutdown during supervision vs. shutdown for

rolling upgrade. Contributed by Jason Lowe
This commit is contained in:
Xuan 2015-05-08 15:10:43 -07:00
parent d0e75e60fb
commit 088156de43
6 changed files with 124 additions and 23 deletions

View File

@ -206,6 +206,9 @@ Release 2.8.0 - UNRELEASED
yarn.scheduler.capacity.node-locality-delay in code and default xml file.
(Nijel SF via vinodkv)
YARN-2331. Distinguish shutdown during supervision vs. shutdown for
rolling upgrade. (Jason Lowe via xgong)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -1158,6 +1158,10 @@ public class YarnConfiguration extends Configuration {
public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";
public static final String NM_RECOVERY_SUPERVISED =
NM_RECOVERY_PREFIX + "supervised";
public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;
////////////////////////////////
// Web Proxy Configs
////////////////////////////////

View File

@ -1192,6 +1192,15 @@
<value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
</property>
<property>
<description>Whether the nodemanager is running under supervision. A
nodemanager that supports recovery and is running under supervision
will not try to cleanup containers as it exits with the assumption
it will be immediately be restarted and recover containers.</description>
<name>yarn.nodemanager.recovery.supervised</name>
<value>false</value>
</property>
<!--Docker configuration-->
<property>

View File

@ -530,8 +530,11 @@ public class ContainerManagerImpl extends CompositeService implements
if (this.context.getNMStateStore().canRecover()
&& !this.context.getDecommissioned()) {
// do not cleanup apps as they can be recovered on restart
return;
if (getConfig().getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED,
YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED)) {
// do not cleanup apps as they can be recovered on restart
return;
}
}
List<ApplicationId> appIds =

View File

@ -145,10 +145,13 @@ public class LogAggregationService extends AbstractService implements
private void stopAggregators() {
threadPool.shutdown();
boolean supervised = getConfig().getBoolean(
YarnConfiguration.NM_RECOVERY_SUPERVISED,
YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED);
// if recovery on restart is supported then leave outstanding aggregations
// to the next restart
boolean shouldAbort = context.getNMStateStore().canRecover()
&& !context.getDecommissioned();
&& !context.getDecommissioned() && supervised;
// politely ask to finish
for (AppLogAggregator aggregator : appLogAggregators.values()) {
if (shouldAbort) {

View File

@ -22,7 +22,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
@ -68,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@ -82,27 +87,18 @@ public class TestContainerManagerRecovery {
public void testApplicationRecovery() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
Context context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
Context context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context);
cm.init(conf);
cm.start();
// simulate registration with RM
MasterKey masterKey = new MasterKeyPBImpl();
masterKey.setKeyId(123);
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
.byteValue() }));
context.getContainerTokenSecretManager().setMasterKey(masterKey);
context.getNMTokenSecretManager().setMasterKey(masterKey);
// add an application by starting a container
String appUser = "app_user1";
String modUser = "modify_user1";
@ -155,9 +151,7 @@ public class TestContainerManagerRecovery {
// reset container manager and verify app recovered with proper acls
cm.stop();
context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
@ -201,9 +195,7 @@ public class TestContainerManagerRecovery {
// restart and verify app is marked for finishing
cm.stop();
context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
@ -233,9 +225,7 @@ public class TestContainerManagerRecovery {
// restart and verify app is no longer present after recovery
cm.stop();
context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
@ -243,6 +233,95 @@ public class TestContainerManagerRecovery {
cm.stop();
}
@Test
public void testContainerCleanupOnShutdown() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
Map<String, String> containerEnv = Collections.emptyMap();
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
containerCreds.writeTokenStorageToStream(dob);
ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
dob.getLength());
Map<ApplicationAccessType, String> acls = Collections.emptyMap();
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, containerEnv, containerCmds, serviceData,
containerTokens, acls);
// create the logAggregationContext
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance("includePattern", "excludePattern");
// verify containers are stopped on shutdown without recovery
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
Context context = createContext(conf, new NMNullStateStoreService());
ContainerManagerImpl cm = spy(createContainerManager(context));
cm.init(conf);
cm.start();
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
// verify containers are stopped on shutdown with unsupervised recovery
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
NMMemoryStateStoreService memStore = new NMMemoryStateStoreService();
memStore.init(conf);
memStore.start();
context = createContext(conf, memStore);
cm = spy(createContainerManager(context));
cm.init(conf);
cm.start();
startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
memStore.close();
verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
// verify containers are not stopped on shutdown with supervised recovery
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
memStore = new NMMemoryStateStoreService();
memStore.init(conf);
memStore.start();
context = createContext(conf, memStore);
cm = spy(createContainerManager(context));
cm.init(conf);
cm.start();
startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
memStore.close();
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
}
private NMContext createContext(YarnConfiguration conf,
NMStateStoreService stateStore) {
NMContext context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
// simulate registration with RM
MasterKey masterKey = new MasterKeyPBImpl();
masterKey.setKeyId(123);
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
.byteValue() }));
context.getContainerTokenSecretManager().setMasterKey(masterKey);
context.getNMTokenSecretManager().setMasterKey(masterKey);
return context;
}
private StartContainersResponse startContainer(Context context,
final ContainerManagerImpl cm, ContainerId cid,
ContainerLaunchContext clc, LogAggregationContext logAggregationContext)