diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index b5727aa0fea..15e1ceac653 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -622,7 +623,20 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); - return GetContainersResponse.newInstance(null); + ApplicationAttemptId attemptId = request.getApplicationAttemptId(); + List containers = new ArrayList<>(); + synchronized (applicationContainerIdMap) { + // Return the list of running containers that were being tracked for this + // application + Assert.assertTrue("The application id is NOT registered: " + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); + for (ContainerId c : ids) { + containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0, + null, null, 0, null, null)); + } + } + return GetContainersResponse.newInstance(containers); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index ebd85bf44f7..815e39bffff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -128,11 +128,8 @@ public class AMRMProxyService extends CompositeService implements new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore()); this.secretManager.init(conf); - // Both second app attempt and NM restart within Federation need registry if (conf.getBoolean(YarnConfiguration.AMRM_PROXY_HA_ENABLED, - YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED) - || conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, - YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) { + YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED)) { this.registry = FederationStateStoreFacade.createInstance(conf, YarnConfiguration.YARN_REGISTRY_CLASS, YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index ef5e0619240..9a53a50cce1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -37,16 +37,23 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.PreemptionContract; @@ -59,6 +66,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; @@ -90,6 +99,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private static final Logger LOG = LoggerFactory.getLogger(FederationInterceptor.class); + public static final String NMSS_CLASS_PREFIX = "FederationInterceptor/"; + + public static final String NMSS_REG_REQUEST_KEY = + NMSS_CLASS_PREFIX + "registerRequest"; + public static final String NMSS_REG_RESPONSE_KEY = + NMSS_CLASS_PREFIX + "registerResponse"; + + /* + * When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn + * Registry. Otherwise if NM recovery is enabled, the UAM token are store in + * local NMSS instead under this directory name. + */ + public static final String NMSS_SECONDARY_SC_PREFIX = + NMSS_CLASS_PREFIX + "secondarySC/"; + public static final String STRING_TO_BYTE_FORMAT = "UTF-8"; + /** * The home sub-cluster is the sub-cluster where the AM container is running * in. @@ -187,14 +212,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } catch (Exception ex) { throw new YarnRuntimeException(ex); } - // Add all app tokens for Yarn Registry access - if (this.registryClient != null && appContext.getCredentials() != null) { - this.appOwner.addCredentials(appContext.getCredentials()); + + if (appContext.getRegistryClient() != null) { + this.registryClient = new FederationRegistryClient(conf, + appContext.getRegistryClient(), this.appOwner); + // Add all app tokens for Yarn Registry access + if (appContext.getCredentials() != null) { + this.appOwner.addCredentials(appContext.getCredentials()); + } } this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); - this.homeRM = createHomeRMProxy(appContext); + this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class, + this.appOwner); this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); @@ -204,11 +235,137 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.uamPool.init(conf); this.uamPool.start(); + } - if (appContext.getRegistryClient() != null) { - this.registryClient = new FederationRegistryClient(conf, - appContext.getRegistryClient(), this.appOwner); + @Override + public void recover(Map recoveredDataMap) { + super.recover(recoveredDataMap); + LOG.info("Recovering data for FederationInterceptor"); + if (recoveredDataMap == null) { + return; } + + ApplicationAttemptId attemptId = + getApplicationContext().getApplicationAttemptId(); + try { + if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) { + RegisterApplicationMasterRequestProto pb = + RegisterApplicationMasterRequestProto + .parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY)); + this.amRegistrationRequest = + new RegisterApplicationMasterRequestPBImpl(pb); + LOG.info("amRegistrationRequest recovered for {}", attemptId); + } + if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) { + RegisterApplicationMasterResponseProto pb = + RegisterApplicationMasterResponseProto + .parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY)); + this.amRegistrationResponse = + new RegisterApplicationMasterResponsePBImpl(pb); + LOG.info("amRegistrationResponse recovered for {}", attemptId); + } + + // Recover UAM amrmTokens from registry or NMSS + Map> uamMap; + if (this.registryClient != null) { + uamMap = this.registryClient + .loadStateFromRegistry(attemptId.getApplicationId()); + LOG.info("Found {} existing UAMs for application {} in Yarn Registry", + uamMap.size(), attemptId.getApplicationId()); + } else { + uamMap = new HashMap<>(); + for (Entry entry : recoveredDataMap.entrySet()) { + if (entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) { + // entry for subClusterId -> UAM amrmToken + String scId = + entry.getKey().substring(NMSS_SECONDARY_SC_PREFIX.length()); + Token amrmToken = new Token<>(); + amrmToken.decodeFromUrlString( + new String(entry.getValue(), STRING_TO_BYTE_FORMAT)); + uamMap.put(scId, amrmToken); + LOG.debug("Recovered UAM in " + scId + " from NMSS"); + } + } + LOG.info("Found {} existing UAMs for application {} in NMStateStore", + uamMap.size(), attemptId.getApplicationId()); + } + + // Re-attach the UAMs + int containers = 0; + for (Map.Entry> entry : uamMap + .entrySet()) { + SubClusterId subClusterId = SubClusterId.newInstance(entry.getKey()); + + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId.getId()); + + try { + this.uamPool.reAttachUAM(subClusterId.getId(), config, + attemptId.getApplicationId(), + this.amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), this.homeSubClusterId.getId(), + entry.getValue()); + + RegisterApplicationMasterResponse response = + this.uamPool.registerApplicationMaster(subClusterId.getId(), + this.amRegistrationRequest); + + // Running containers from secondary RMs + for (Container container : response + .getContainersFromPreviousAttempts()) { + containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + containers++; + } + LOG.info("Recovered {} running containers from UAM in {}", + response.getContainersFromPreviousAttempts().size(), + subClusterId); + + } catch (Exception e) { + LOG.error( + "Error reattaching UAM to " + subClusterId + " for " + attemptId, + e); + } + } + + // Get the running containers from home RM, note that we will also get the + // AM container itself from here. We don't need it, but no harm to put the + // map as well. + UserGroupInformation appSubmitter = UserGroupInformation + .createRemoteUser(getApplicationContext().getUser()); + ApplicationClientProtocol rmClient = + createHomeRMProxy(getApplicationContext(), + ApplicationClientProtocol.class, appSubmitter); + + GetContainersResponse response = + rmClient.getContainers(GetContainersRequest.newInstance(attemptId)); + for (ContainerReport container : response.getContainerList()) { + containerIdToSubClusterIdMap.put(container.getContainerId(), + this.homeSubClusterId); + containers++; + LOG.debug(" From home RM " + this.homeSubClusterId + + " running container " + container.getContainerId()); + } + LOG.info("{} running containers including AM recovered from home RM ", + response.getContainerList().size(), this.homeSubClusterId); + + LOG.info( + "In all {} UAMs {} running containers including AM recovered for {}", + uamMap.size(), containers, attemptId); + + if (this.amRegistrationResponse != null) { + // Initialize the AMRMProxyPolicy + String queue = this.amRegistrationResponse.getQueue(); + this.policyInterpreter = + FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, + getConf(), this.federationFacade, this.homeSubClusterId); + } + } catch (IOException | YarnException e) { + throw new YarnRuntimeException(e); + } + } /** @@ -242,6 +399,19 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // Save the registration request. This will be used for registering with // secondary sub-clusters using UAMs, as well as re-register later this.amRegistrationRequest = request; + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterRequestPBImpl pb = + (RegisterApplicationMasterRequestPBImpl) + this.amRegistrationRequest; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } } /* @@ -278,6 +448,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor { getApplicationContext().getApplicationAttemptId().getApplicationId(); reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId); + if (getNMStateStore() != null) { + try { + RegisterApplicationMasterResponsePBImpl pb = + (RegisterApplicationMasterResponsePBImpl) + this.amRegistrationResponse; + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray()); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry for " + + getApplicationContext().getApplicationAttemptId(), e); + } + } + // the queue this application belongs will be used for getting // AMRMProxy policy from state store. String queue = this.amRegistrationResponse.getQueue(); @@ -437,6 +621,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (uamResponse.getResponse() == null || !uamResponse.getResponse().getIsUnregistered()) { failedToUnRegister = true; + } else if (getNMStateStore() != null) { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId()); } } catch (Throwable e) { failedToUnRegister = true; @@ -496,6 +684,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } + @VisibleForTesting + protected FederationRegistryClient getRegistryClient() { + return this.registryClient; + } + /** * Create the UAM pool manager for secondary sub-clsuters. For unit test to * override. @@ -510,18 +703,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** - * Returns instance of the ApplicationMasterProtocol proxy class that is used - * to connect to the Home resource manager. + * Create a proxy instance that is used to connect to the Home resource + * manager. * * @param appContext AMRMProxyApplicationContext + * @param protocol the protocol class for the proxy + * @param user the ugi for the proxy + * @param the type of the proxy * @return the proxy created */ - protected ApplicationMasterProtocol createHomeRMProxy( - AMRMProxyApplicationContext appContext) { + protected T createHomeRMProxy(AMRMProxyApplicationContext appContext, + Class protocol, UserGroupInformation user) { try { return FederationProxyProviderUtil.createRMProxy(appContext.getConf(), - ApplicationMasterProtocol.class, this.homeSubClusterId, this.appOwner, - appContext.getAMRMToken()); + protocol, this.homeSubClusterId, user, appContext.getAMRMToken()); } catch (Exception ex) { throw new YarnRuntimeException(ex); } @@ -810,17 +1005,31 @@ public class FederationInterceptor extends AbstractRequestInterceptor { responses.add(response); } - // Save the new AMRMToken for the UAM in registry if present + // Save the new AMRMToken for the UAM if present if (response.getAMRMToken() != null) { Token newToken = ConverterUtils .convertFromYarn(response.getAMRMToken(), (Text) null); - // Update the token in registry + // Update the token in registry or NMSS if (registryClient != null) { registryClient .writeAMRMTokenForUAM( getApplicationContext().getApplicationAttemptId() .getApplicationId(), subClusterId.getId(), newToken); + } else if (getNMStateStore() != null) { + try { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(), + newToken.encodeToUrlString() + .getBytes(STRING_TO_BYTE_FORMAT)); + } catch (IOException e) { + LOG.error( + "Error storing UAM token as AMRMProxy " + + "context entry in NMSS for " + + getApplicationContext().getApplicationAttemptId(), + e); + } } } @@ -925,12 +1134,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor { successfulRegistrations.put(uamResponse.getSubClusterId(), uamResponse.getResponse()); + // Save the UAM token in registry or NMSS if (registryClient != null) { registryClient.writeAMRMTokenForUAM( getApplicationContext().getApplicationAttemptId() .getApplicationId(), uamResponse.getSubClusterId().getId(), uamResponse.getUamToken()); + } else if (getNMStateStore() != null) { + getNMStateStore().storeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + + uamResponse.getSubClusterId().getId(), + uamResponse.getUamToken().encodeToUrlString() + .getBytes(STRING_TO_BYTE_FORMAT)); } } } catch (Exception e) { @@ -952,11 +1169,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private AllocateResponse mergeAllocateResponses( AllocateResponse homeResponse) { // Timing issue, we need to remove the completed and then save the new ones. - if (LOG.isDebugEnabled()) { - LOG.debug("Remove containers: " - + homeResponse.getCompletedContainersStatuses()); - LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers()); - } removeFinishedContainersFromCache( homeResponse.getCompletedContainersStatuses()); cacheAllocatedContainers(homeResponse.getAllocatedContainers(), @@ -989,6 +1201,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private void removeFinishedContainersFromCache( List finishedContainers) { for (ContainerStatus container : finishedContainers) { + LOG.debug("Completed container {}", container); if (containerIdToSubClusterIdMap .containsKey(container.getContainerId())) { containerIdToSubClusterIdMap.remove(container.getContainerId()); @@ -1146,12 +1359,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private void cacheAllocatedContainers(List containers, SubClusterId subClusterId) { for (Container container : containers) { + LOG.debug("Adding container {}", container); if (containerIdToSubClusterIdMap.containsKey(container.getId())) { SubClusterId existingSubClusterId = containerIdToSubClusterIdMap.get(container.getId()); if (existingSubClusterId.equals(subClusterId)) { - // When RM fails over, the new RM master might send out the same - // container allocation more than once. Just move on in this case. + /* + * When RM fails over, the new RM master might send out the same + * container allocation more than once. + * + * It is also possible because of a recent NM restart with NM recovery + * enabled. We recover running containers from RM. But RM might not + * notified AM of some of these containers yet. When RM dose notify, + * we will already have these containers in the map. + * + * Either case, just warn and move on. + */ LOG.warn( "Duplicate containerID: {} found in the allocated containers" + " from same sub-cluster: {}, so ignoring.", @@ -1226,7 +1449,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private boolean warnIfNotExists(ContainerId containerId, String actionName) { if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) { - LOG.error("AM is trying to {} a container {} that does not exist. ", + LOG.error( + "AM is trying to {} a container {} that does not exist. Might happen " + + "shortly after NM restart when NM recovery is enabled", actionName, containerId.toString()); return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index da1d047d957..0319dbe1bfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -180,6 +181,20 @@ public abstract class BaseAMRMProxyTest { return new NMContext(null, null, null, null, stateStore, false, this.conf); } + // A utility method for intercepter recover unit test + protected Map recoverDataMapForAppAttempt( + NMStateStoreService nmStateStore, ApplicationAttemptId attemptId) + throws IOException { + RecoveredAMRMProxyState state = nmStateStore.loadAMRMProxyState(); + for (Map.Entry> entry : state + .getAppContexts().entrySet()) { + if (entry.getKey().equals(attemptId)) { + return entry.getValue(); + } + } + return null; + } + protected List getCompletedContainerIds( List containerStatus) { List ret = new ArrayList<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index aa7ed697b87..eefaba1a8bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -449,6 +449,104 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { } } + @Test + public void testRecoverWithAMRMProxyHA() throws Exception { + testRecover(registry); + } + + @Test + public void testRecoverWithoutAMRMProxyHA() throws Exception { + testRecover(null); + } + + public void testRecover(RegistryOperations registryObj) throws Exception { + ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); + userInfo.getUser().doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + interceptor = new TestableFederationInterceptor(); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registryObj)); + interceptor.cleanupRegistry(); + + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Prepare for Federation Interceptor restart and recover + Map recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + String scEntry = + FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1"; + if (registryObj == null) { + Assert.assertTrue(recoveredDataMap.containsKey(scEntry)); + } else { + // When AMRMPRoxy HA is enabled, NMSS should not have the UAM token, + // it should be in Registry + Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); + } + + // Preserve the mock RM instances + MockResourceManagerFacade homeRM = interceptor.getHomeRM(); + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Create a new intercepter instance and recover + interceptor = new TestableFederationInterceptor(homeRM, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registryObj)); + interceptor.recover(recoveredDataMap); + + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + // After the application succeeds, the registry/NMSS entry should be + // cleaned up + if (registryObj != null) { + Assert.assertEquals(0, + interceptor.getRegistryClient().getAllApplications().size()); + } else { + recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); + } + return null; + } + }); + } + @Test public void testRequestInterceptorChainCreation() throws Exception { RequestInterceptor root = @@ -636,6 +734,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor.finishApplicationMaster(finishReq); Assert.assertNotNull(finshResponse); Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + // After the application succeeds, the registry entry should be deleted + if (interceptor.getRegistryClient() != null) { + Assert.assertEquals(0, + interceptor.getRegistryClient().getAllApplications().size()); + } return null; } }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index 23c80ae9090..1088c698619 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -59,16 +58,17 @@ public class TestableFederationInterceptor extends FederationInterceptor { return new TestableUnmanagedAMPoolManager(threadPool); } + @SuppressWarnings("unchecked") @Override - protected ApplicationMasterProtocol createHomeRMProxy( - AMRMProxyApplicationContext appContext) { + protected T createHomeRMProxy(AMRMProxyApplicationContext appContext, + Class protocol, UserGroupInformation user) { synchronized (this) { if (mockRm == null) { mockRm = new MockResourceManagerFacade( new YarnConfiguration(super.getConf()), 0); } } - return mockRm; + return (T) mockRm; } @SuppressWarnings("unchecked")