From ed3109136100a21d971484f242d80f2a7e7d337d Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Mon, 20 Nov 2017 14:21:58 -0800 Subject: [PATCH] YARN-6128. Add support for AMRMProxy HA. (Botong Huang via Subru). --- .../hadoop/yarn/conf/YarnConfiguration.java | 13 + .../src/main/resources/yarn-default.xml | 21 ++ .../hadoop-yarn-server-common/pom.xml | 5 + .../utils/FederationRegistryClient.java | 338 ++++++++++++++++++ .../server/uam/UnmanagedAMPoolManager.java | 142 ++++++-- .../uam/UnmanagedApplicationManager.java | 212 ++++++----- .../yarn/server/utils/AMRMClientUtils.java | 30 +- .../server/MockResourceManagerFacade.java | 100 +++--- .../utils/TestFederationRegistryClient.java | 90 +++++ .../uam/TestUnmanagedApplicationManager.java | 100 +++++- .../AMRMProxyApplicationContext.java | 16 + .../AMRMProxyApplicationContextImpl.java | 35 +- .../amrmproxy/AMRMProxyService.java | 83 ++++- .../amrmproxy/FederationInterceptor.java | 221 +++++++++++- .../ContainerManagerImpl.java | 9 +- .../amrmproxy/BaseAMRMProxyTest.java | 12 +- .../amrmproxy/TestAMRMProxyService.java | 21 +- .../amrmproxy/TestFederationInterceptor.java | 126 ++++++- .../TestableFederationInterceptor.java | 29 +- .../hadoop/yarn/server/MiniYARNCluster.java | 6 +- .../src/site/markdown/Federation.md | 11 +- 21 files changed, 1342 insertions(+), 278 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a97dc57ea21..edeec9f2d6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1948,6 +1948,9 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = "org.apache.hadoop.yarn.server.nodemanager.amrmproxy." + "DefaultRequestInterceptor"; + public static final String AMRM_PROXY_HA_ENABLED = NM_PREFIX + + "amrmproxy.ha.enable"; + public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false; /** * Default platform-agnostic CLASSPATH for YARN applications. A @@ -2790,6 +2793,11 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; + public static final String FEDERATION_REGISTRY_BASE_KEY = + FEDERATION_PREFIX + "registry.base-dir"; + public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = + "yarnfederation/"; + // 5 minutes public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; @@ -2947,6 +2955,11 @@ public class YarnConfiguration extends Configuration { // Other Configs //////////////////////////////// + public static final String YARN_REGISTRY_CLASS = + YARN_PREFIX + "registry.class"; + public static final String DEFAULT_YARN_REGISTRY_CLASS = + "org.apache.hadoop.registry.client.impl.FSRegistryOperationsService"; + /** * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead. * The interval of the yarn client's querying application state after diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 46fb7c76422..71dd72aa149 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2815,7 +2815,20 @@ 300 + + The registry base directory for federation. + yarn.federation.registry.base-dir + yarnfederation/ + + + + + The registry implementation to use. + yarn.registry.class + org.apache.hadoop.registry.client.impl.FSRegistryOperationsService + + The interval that the yarn client library uses to poll the completion status of the asynchronous API of application client protocol. @@ -2976,6 +2989,14 @@ org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor + + + Whether AMRMProxy HA is enabled. + + yarn.nodemanager.amrmproxy.ha.enable + false + + Setting that controls whether distributed scheduling is enabled. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 47265539575..0af5a9a6c32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -67,6 +67,11 @@ test + + org.apache.hadoop + hadoop-yarn-registry + + com.google.guava guava diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java new file mode 100644 index 00000000000..662431836a7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Helper class that handles reads and writes to Yarn Registry to support UAM HA + * and second attempt. + */ +public class FederationRegistryClient { + private static final Logger LOG = + LoggerFactory.getLogger(FederationRegistryClient.class); + + private RegistryOperations registry; + + private UserGroupInformation user; + + // AppId -> SubClusterId -> UAM token + private Map>> + appSubClusterTokenMap; + + // Structure in registry: // -> UAMToken + private String registryBaseDir; + + public FederationRegistryClient(Configuration conf, + RegistryOperations registry, UserGroupInformation user) { + this.registry = registry; + this.user = user; + this.appSubClusterTokenMap = new ConcurrentHashMap<>(); + this.registryBaseDir = + conf.get(YarnConfiguration.FEDERATION_REGISTRY_BASE_KEY, + YarnConfiguration.DEFAULT_FEDERATION_REGISTRY_BASE_KEY); + LOG.info("Using registry {} with base directory: {}", + this.registry.getClass().getName(), this.registryBaseDir); + } + + /** + * Get the list of known applications in the registry. + * + * @return the list of known applications + */ + public List getAllApplications() { + // Suppress the exception here because it is valid that the entry does not + // exist + List applications = null; + try { + applications = listDirRegistry(this.registry, this.user, + getRegistryKey(null, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + if (applications == null) { + // It is valid for listDirRegistry to return null + return new ArrayList<>(); + } + return applications; + } + + /** + * For testing, delete all application records in registry. + */ + @VisibleForTesting + public void cleanAllApplications() { + try { + removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null), + true, false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from removeKeyRegistry", e); + } + } + + /** + * Write/update the UAM token for an application and a sub-cluster. + * + * @param subClusterId sub-cluster id of the token + * @param token the UAM of the application + * @return whether the amrmToken is added or updated to a new value + */ + public boolean writeAMRMTokenForUAM(ApplicationId appId, + String subClusterId, Token token) { + Map> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + if (subClusterTokenMap == null) { + subClusterTokenMap = new ConcurrentHashMap<>(); + this.appSubClusterTokenMap.put(appId, subClusterTokenMap); + } + + boolean update = !token.equals(subClusterTokenMap.get(subClusterId)); + if (!update) { + LOG.debug("Same amrmToken received from {}, skip writing registry for {}", + subClusterId, appId); + return update; + } + + LOG.info("Writing/Updating amrmToken for {} to registry for {}", + subClusterId, appId); + try { + // First, write the token entry + writeRegistry(this.registry, this.user, + getRegistryKey(appId, subClusterId), token.encodeToUrlString(), true); + + // Then update the subClusterTokenMap + subClusterTokenMap.put(subClusterId, token); + } catch (YarnException | IOException e) { + LOG.error( + "Failed writing AMRMToken to registry for subcluster " + subClusterId, + e); + } + return update; + } + + /** + * Load the information of one application from registry. + * + * @param appId application id + * @return the sub-cluster to UAM token mapping + */ + public Map> + loadStateFromRegistry(ApplicationId appId) { + Map> retMap = new HashMap<>(); + // Suppress the exception here because it is valid that the entry does not + // exist + List subclusters = null; + try { + subclusters = listDirRegistry(this.registry, this.user, + getRegistryKey(appId, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + + if (subclusters == null) { + LOG.info("Application {} does not exist in registry", appId); + return retMap; + } + + // Read the amrmToken for each sub-cluster with an existing UAM + for (String scId : subclusters) { + LOG.info("Reading amrmToken for subcluster {} for {}", scId, appId); + String key = getRegistryKey(appId, scId); + try { + String tokenString = readRegistry(this.registry, this.user, key, true); + if (tokenString == null) { + throw new YarnException("Null string from readRegistry key " + key); + } + Token amrmToken = new Token<>(); + amrmToken.decodeFromUrlString(tokenString); + // Clear the service field, as if RM just issued the token + amrmToken.setService(new Text()); + + retMap.put(scId, amrmToken); + } catch (Exception e) { + LOG.error("Failed reading registry key " + key + + ", skipping subcluster " + scId, e); + } + } + + // Override existing map if there + this.appSubClusterTokenMap.put(appId, new ConcurrentHashMap<>(retMap)); + return retMap; + } + + /** + * Remove an application from registry. + * + * @param appId application id + */ + public void removeAppFromRegistry(ApplicationId appId) { + Map> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + LOG.info("Removing all registry entries for {}", appId); + + if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) { + return; + } + + // Lastly remove the application directory + String key = getRegistryKey(appId, null); + try { + removeKeyRegistry(this.registry, this.user, key, true, true); + subClusterTokenMap.clear(); + } catch (YarnException e) { + LOG.error("Failed removing registry directory key " + key, e); + } + } + + private String getRegistryKey(ApplicationId appId, String fileName) { + if (appId == null) { + return this.registryBaseDir; + } + if (fileName == null) { + return this.registryBaseDir + appId.toString(); + } + return this.registryBaseDir + appId.toString() + "/" + fileName; + } + + private String readRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + // Use the ugi loaded with app credentials to access registry + String result = ugi.doAs(new PrivilegedAction() { + @Override + public String run() { + try { + ServiceRecord value = registryImpl.resolve(key); + if (value != null) { + return value.description; + } + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry resolve key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry resolve key " + key + " failed"); + } + return result; + } + + private void removeKeyRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean recursive, + final boolean throwIfFails) throws YarnException { + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + try { + registryImpl.delete(key, recursive); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry remove key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry remove key " + key + " failed"); + } + } + + /** + * Write registry entry, override if exists. + */ + private void writeRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final String value, + final boolean throwIfFails) throws YarnException { + + final ServiceRecord recordValue = new ServiceRecord(); + recordValue.description = value; + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + try { + registryImpl.bind(key, recordValue, BindFlags.OVERWRITE); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry write key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry write key " + key + " failed"); + } + } + + /** + * List the sub directories in the given directory. + */ + private List listDirRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + List result = ugi.doAs(new PrivilegedAction>() { + @Override + public List run() { + try { + return registryImpl.list(key); + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry list key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry list key " + key + " failed"); + } + return result; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 08aee77fe6d..0c012176392 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -44,9 +45,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; @@ -67,7 +68,7 @@ public class UnmanagedAMPoolManager extends AbstractService { // Map from uamId to UAM instances private Map unmanagedAppMasterMap; - private Map attemptIdMap; + private Map appIdMap; private ExecutorService threadpool; @@ -82,7 +83,7 @@ public class UnmanagedAMPoolManager extends AbstractService { this.threadpool = Executors.newCachedThreadPool(); } this.unmanagedAppMasterMap = new ConcurrentHashMap<>(); - this.attemptIdMap = new ConcurrentHashMap<>(); + this.appIdMap = new ConcurrentHashMap<>(); super.serviceStart(); } @@ -114,7 +115,7 @@ public class UnmanagedAMPoolManager extends AbstractService { public KillApplicationResponse call() throws Exception { try { LOG.info("Force-killing UAM id " + uamId + " for application " - + attemptIdMap.get(uamId)); + + appIdMap.get(uamId)); return unmanagedAppMasterMap.remove(uamId).forceKillApplication(); } catch (Exception e) { LOG.error("Failed to kill unmanaged application master", e); @@ -132,7 +133,7 @@ public class UnmanagedAMPoolManager extends AbstractService { LOG.error("Failed to kill unmanaged application master", e); } } - this.attemptIdMap.clear(); + this.appIdMap.clear(); super.serviceStop(); } @@ -145,13 +146,18 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery. + * @see ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean) * @return uamId for the UAM * @throws YarnException if registerApplicationMaster fails * @throws IOException if registerApplicationMaster fails */ public String createAndRegisterNewUAM( RegisterApplicationMasterRequest registerRequest, Configuration conf, - String queueName, String submitter, String appNameSuffix) + String queueName, String submitter, String appNameSuffix, + boolean keepContainersAcrossApplicationAttempts) throws YarnException, IOException { ApplicationId appId = null; ApplicationClientProtocol rmClient; @@ -173,45 +179,52 @@ public class UnmanagedAMPoolManager extends AbstractService { rmClient = null; } - createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId, - queueName, submitter, appNameSuffix); + // Launch the UAM in RM + launchUAM(appId.toString(), conf, appId, queueName, submitter, + appNameSuffix, keepContainersAcrossApplicationAttempts); + + // Register the UAM application + registerApplicationMaster(appId.toString(), registerRequest); + + // Returns the appId as uamId return appId.toString(); } /** - * Create a new UAM and register the application, using the provided uamId and - * appId. + * Launch a new UAM, using the provided uamId and appId. * - * @param uamId identifier for the UAM - * @param registerRequest RegisterApplicationMasterRequest + * @param uamId uam Id * @param conf configuration for this UAM * @param appId application id for the UAM * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM - * @return RegisterApplicationMasterResponse - * @throws YarnException if registerApplicationMaster fails - * @throws IOException if registerApplicationMaster fails + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery. + * @see ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean) + * @return UAM token + * @throws YarnException if fails + * @throws IOException if fails */ - public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, - RegisterApplicationMasterRequest registerRequest, Configuration conf, + public Token launchUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) throws YarnException, IOException { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) + throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); } - UnmanagedApplicationManager uam = - createUAM(conf, appId, queueName, submitter, appNameSuffix); + UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, + submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); // Put the UAM into map first before initializing it to avoid additional UAM // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); - RegisterApplicationMasterResponse response = null; + Token amrmToken = null; try { - LOG.info("Creating and registering UAM id {} for application {}", uamId, - appId); - response = uam.createAndRegisterApplicationMaster(registerRequest); + LOG.info("Launching UAM id {} for application {}", uamId, appId); + amrmToken = uam.launchUAM(); } catch (Exception e) { // Add the map earlier and remove here if register failed because we want // to make sure there is only one uam instance per uamId at any given time @@ -219,8 +232,48 @@ public class UnmanagedAMPoolManager extends AbstractService { throw e; } - this.attemptIdMap.put(uamId, uam.getAttemptId()); - return response; + this.appIdMap.put(uamId, uam.getAppId()); + return amrmToken; + } + + /** + * Re-attach to an existing UAM, using the provided uamIdentifier. + * + * @param uamId uam Id + * @param conf configuration for this UAM + * @param appId application id for the UAM + * @param queueName queue of the application + * @param submitter submitter name of the UAM + * @param appNameSuffix application name suffix for the UAM + * @param uamToken UAM token + * @throws YarnException if fails + * @throws IOException if fails + */ + public void reAttachUAM(String uamId, Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix, Token uamToken) + throws YarnException, IOException { + + if (this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " already exists"); + } + UnmanagedApplicationManager uam = + createUAM(conf, appId, queueName, submitter, appNameSuffix, true); + // Put the UAM into map first before initializing it to avoid additional UAM + // for the same uamId being created concurrently + this.unmanagedAppMasterMap.put(uamId, uam); + + try { + LOG.info("Reattaching UAM id {} for application {}", uamId, appId); + uam.reAttachUAM(uamToken); + } catch (Exception e) { + // Add the map earlier and remove here if register failed because we want + // to make sure there is only one uam instance per uamId at any given time + this.unmanagedAppMasterMap.remove(uamId); + throw e; + } + + this.appIdMap.put(uamId, uam.getAppId()); } /** @@ -231,20 +284,42 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param queueName queue of the application * @param submitter submitter name of the application * @param appNameSuffix application name suffix + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * @return the UAM instance */ @VisibleForTesting protected UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { return new UnmanagedApplicationManager(conf, appId, queueName, submitter, - appNameSuffix); + appNameSuffix, keepContainersAcrossApplicationAttempts); + } + + /** + * Register application master for the UAM. + * + * @param uamId uam Id + * @param registerRequest RegisterApplicationMasterRequest + * @return register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + String uamId, RegisterApplicationMasterRequest registerRequest) + throws YarnException, IOException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + LOG.info("Registering UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); + return this.unmanagedAppMasterMap.get(uamId) + .registerApplicationMaster(registerRequest); } /** * AllocateAsync to an UAM. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request AllocateRequest * @param callback callback for response * @throws YarnException if allocate fails @@ -262,7 +337,7 @@ public class UnmanagedAMPoolManager extends AbstractService { /** * Finish an UAM/application. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request FinishApplicationMasterRequest * @return FinishApplicationMasterResponse * @throws YarnException if finishApplicationMaster call fails @@ -274,14 +349,15 @@ public class UnmanagedAMPoolManager extends AbstractService { if (!this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " does not exist"); } - LOG.info("Finishing application for UAM id {} ", uamId); + LOG.info("Finishing UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); FinishApplicationMasterResponse response = this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request); if (response.getIsUnregistered()) { // Only remove the UAM when the unregister finished this.unmanagedAppMasterMap.remove(uamId); - this.attemptIdMap.remove(uamId); + this.appIdMap.remove(uamId); LOG.info("UAM id {} is unregistered", uamId); } return response; @@ -301,7 +377,7 @@ public class UnmanagedAMPoolManager extends AbstractService { /** * Return whether an UAM exists. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @return UAM exists or not */ public boolean hasUAMId(String uamId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 6531a75c95a..3f4a1100b5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -50,7 +50,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -90,7 +92,6 @@ public class UnmanagedApplicationManager { private AMRequestHandlerThread handlerThread; private ApplicationMasterProtocol rmProxy; private ApplicationId applicationId; - private ApplicationAttemptId attemptId; private String submitter; private String appNameSuffix; private Configuration conf; @@ -101,9 +102,31 @@ public class UnmanagedApplicationManager { private ApplicationClientProtocol rmClient; private long asyncApiPollIntervalMillis; private RecordFactory recordFactory; + private boolean keepContainersAcrossApplicationAttempts; + /* + * This flag is used as an indication that this method launchUAM/reAttachUAM + * is called (and perhaps blocked in initializeUnmanagedAM below due to RM + * connection/failover issue and not finished yet). Set the flag before + * calling the blocking call to RM. + */ + private boolean connectionInitiated; + + /** + * Constructor. + * + * @param conf configuration + * @param appId application Id to use for this UAM + * @param queueName the queue of the UAM + * @param submitter user name of the app + * @param appNameSuffix the app name suffix to use + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery. See {@link ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean)} + */ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, - String queueName, String submitter, String appNameSuffix) { + String queueName, String submitter, String appNameSuffix, + boolean keepContainersAcrossApplicationAttempts) { Preconditions.checkNotNull(conf, "Configuration cannot be null"); Preconditions.checkNotNull(appId, "ApplicationId cannot be null"); Preconditions.checkNotNull(submitter, "App submitter cannot be null"); @@ -116,6 +139,7 @@ public class UnmanagedApplicationManager { this.handlerThread = new AMRequestHandlerThread(); this.requestQueue = new LinkedBlockingQueue<>(); this.rmProxy = null; + this.connectionInitiated = false; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.asyncApiPollIntervalMillis = conf.getLong( @@ -123,45 +147,84 @@ public class UnmanagedApplicationManager { YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, YarnConfiguration. DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); + this.keepContainersAcrossApplicationAttempts = + keepContainersAcrossApplicationAttempts; + } + + /** + * Launch a new UAM in the resource manager. + * + * @return identifier uam identifier + * @throws YarnException if fails + * @throws IOException if fails + */ + public Token launchUAM() + throws YarnException, IOException { + this.connectionInitiated = true; + + // Blocking call to RM + Token amrmToken = + initializeUnmanagedAM(this.applicationId); + + // Creates the UAM connection + createUAMProxy(amrmToken); + return amrmToken; + } + + /** + * Re-attach to an existing UAM in the resource manager. + * + * @param amrmToken the UAM token + * @throws IOException if re-attach fails + * @throws YarnException if re-attach fails + */ + public void reAttachUAM(Token amrmToken) + throws IOException, YarnException { + this.connectionInitiated = true; + + // Creates the UAM connection + createUAMProxy(amrmToken); + } + + protected void createUAMProxy(Token amrmToken) + throws IOException { + this.userUgi = UserGroupInformation.createProxyUser( + this.applicationId.toString(), UserGroupInformation.getCurrentUser()); + this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, + this.userUgi, amrmToken); } /** * Registers this {@link UnmanagedApplicationManager} with the resource * manager. * - * @param request the register request - * @return the register response + * @param request RegisterApplicationMasterRequest + * @return register response * @throws YarnException if register fails * @throws IOException if register fails */ - public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( + public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - // This need to be done first in this method, because it is used as an - // indication that this method is called (and perhaps blocked due to RM - // connection and not finished yet) + // Save the register request for re-register later this.registerRequest = request; - // attemptId will be available after this call - UnmanagedAMIdentifier identifier = - initializeUnmanagedAM(this.applicationId); - - try { - this.userUgi = UserGroupInformation.createProxyUser( - identifier.getAttemptId().toString(), - UserGroupInformation.getCurrentUser()); - } catch (IOException e) { - LOG.error("Exception while trying to get current user", e); - throw new YarnRuntimeException(e); - } - - this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, - this.userUgi, identifier.getToken()); - - LOG.info("Registering the Unmanaged application master {}", this.attemptId); + // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM. + // We do not expect application already registered exception here + LOG.info("Registering the Unmanaged application master {}", + this.applicationId); RegisterApplicationMasterResponse response = this.rmProxy.registerApplicationMaster(this.registerRequest); + for (Container container : response.getContainersFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing running container " + + container.getId()); + } + for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing NM token for node " + + nmToken.getNodeId()); + } + // Only when register succeed that we start the heartbeat thread this.handlerThread.setUncaughtExceptionHandler( new HeartBeatThreadUncaughtExceptionHandler()); @@ -187,11 +250,11 @@ public class UnmanagedApplicationManager { this.handlerThread.shutdown(); if (this.rmProxy == null) { - if (this.registerRequest != null) { - // This is possible if the async registerApplicationMaster is still + if (this.connectionInitiated) { + // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. LOG.warn("Unmanaged AM still not successfully launched/registered yet." - + " Stopping the UAM client thread anyways."); + + " Stopping the UAM heartbeat thread anyways."); return FinishApplicationMasterResponse.newInstance(false); } else { throw new YarnException("finishApplicationMaster should not " @@ -199,7 +262,7 @@ public class UnmanagedApplicationManager { } } return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, - this.registerRequest, this.attemptId); + this.registerRequest, this.applicationId); } /** @@ -212,7 +275,7 @@ public class UnmanagedApplicationManager { public KillApplicationResponse forceKillApplication() throws IOException, YarnException { KillApplicationRequest request = - KillApplicationRequest.newInstance(this.attemptId.getApplicationId()); + KillApplicationRequest.newInstance(this.applicationId); this.handlerThread.shutdown(); @@ -240,29 +303,29 @@ public class UnmanagedApplicationManager { LOG.debug("Interrupted while waiting to put on response queue", ex); } // Two possible cases why the UAM is not successfully registered yet: - // 1. registerApplicationMaster is not called at all. Should throw here. - // 2. registerApplicationMaster is called but hasn't successfully returned. + // 1. launchUAM is not called at all. Should throw here. + // 2. launchUAM is called but hasn't successfully returned. // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. if (this.rmProxy == null) { - if (this.registerRequest != null) { + if (this.connectionInitiated) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); } else { throw new YarnException( - "AllocateAsync should not be called before createAndRegister"); + "AllocateAsync should not be called before launchUAM"); } } } /** - * Returns the application attempt id of the UAM. + * Returns the application id of the UAM. * - * @return attempt id of the UAM + * @return application id of the UAM */ - public ApplicationAttemptId getAttemptId() { - return this.attemptId; + public ApplicationId getAppId() { + return this.applicationId; } /** @@ -287,15 +350,15 @@ public class UnmanagedApplicationManager { * Launch and initialize an unmanaged AM. First, it creates a new application * on the RM and negotiates a new attempt id. Then it waits for the RM * application attempt state to reach YarnApplicationAttemptState.LAUNCHED - * after which it returns the AM-RM token and the attemptId. + * after which it returns the AM-RM token. * * @param appId application id - * @return the UAM identifier + * @return the UAM token * @throws IOException if initialize fails * @throws YarnException if initialize fails */ - protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId) - throws IOException, YarnException { + protected Token initializeUnmanagedAM( + ApplicationId appId) throws IOException, YarnException { try { UserGroupInformation appSubmitter = UserGroupInformation.createRemoteUser(this.submitter); @@ -306,13 +369,12 @@ public class UnmanagedApplicationManager { submitUnmanagedApp(appId); // Monitor the application attempt to wait for launch state - ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId, + monitorCurrentAppAttempt(appId, EnumSet.of(YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING, YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED), YarnApplicationAttemptState.LAUNCHED); - this.attemptId = attemptReport.getApplicationAttemptId(); - return getUAMIdentifier(); + return getUAMToken(); } finally { this.rmClient = null; } @@ -343,6 +405,8 @@ public class UnmanagedApplicationManager { submitRequest.setApplicationSubmissionContext(context); context.setUnmanagedAM(true); + context.setKeepContainersAcrossApplicationAttempts( + this.keepContainersAcrossApplicationAttempts); LOG.info("Submitting unmanaged application {}", appId); this.rmClient.submitApplication(submitRequest); @@ -374,8 +438,10 @@ public class UnmanagedApplicationManager { if (appStates.contains(state)) { if (state != YarnApplicationState.ACCEPTED) { throw new YarnRuntimeException( - "Received non-accepted application state: " + state - + ". Application " + appId + " not the first attempt?"); + "Received non-accepted application state: " + state + " for " + + appId + ". This is likely because this is not the first " + + "app attempt in home sub-cluster, and AMRMProxy HA " + + "(yarn.nodemanager.amrmproxy.ha.enable) is not enabled."); } appAttemptId = getApplicationReport(appId).getCurrentApplicationAttemptId(); @@ -415,25 +481,25 @@ public class UnmanagedApplicationManager { } /** - * Gets the identifier of the unmanaged AM. + * Gets the amrmToken of the unmanaged AM. * - * @return the identifier of the unmanaged AM. + * @return the amrmToken of the unmanaged AM. * @throws IOException if getApplicationReport fails * @throws YarnException if getApplicationReport fails */ - protected UnmanagedAMIdentifier getUAMIdentifier() + protected Token getUAMToken() throws IOException, YarnException { Token token = null; org.apache.hadoop.yarn.api.records.Token amrmToken = - getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken(); + getApplicationReport(this.applicationId).getAMRMToken(); if (amrmToken != null) { token = ConverterUtils.convertFromYarn(amrmToken, (Text) null); } else { LOG.warn( "AMRMToken not found in the application report for application: {}", - this.attemptId.getApplicationId()); + this.applicationId); } - return new UnmanagedAMIdentifier(this.attemptId, token); + return token; } private ApplicationReport getApplicationReport(ApplicationId appId) @@ -444,29 +510,6 @@ public class UnmanagedApplicationManager { return this.rmClient.getApplicationReport(request).getApplicationReport(); } - /** - * Data structure that encapsulates the application attempt identifier and the - * AMRMTokenIdentifier. Make it public because clients with HA need it. - */ - public static class UnmanagedAMIdentifier { - private ApplicationAttemptId attemptId; - private Token token; - - public UnmanagedAMIdentifier(ApplicationAttemptId attemptId, - Token token) { - this.attemptId = attemptId; - this.token = token; - } - - public ApplicationAttemptId getAttemptId() { - return this.attemptId; - } - - public Token getToken() { - return this.token; - } - } - /** * Data structure that encapsulates AllocateRequest and AsyncCallback * instance. @@ -549,8 +592,10 @@ public class UnmanagedApplicationManager { } request.setResponseId(lastResponseId); + AllocateResponse response = AMRMClientUtils.allocateWithReRegister( - request, rmProxy, registerRequest, attemptId); + request, rmProxy, registerRequest, applicationId); + if (response == null) { throw new YarnException("Null allocateResponse from allocate"); } @@ -578,18 +623,17 @@ public class UnmanagedApplicationManager { LOG.debug("Interrupted while waiting for queue", ex); } } catch (IOException ex) { - LOG.warn( - "IO Error occurred while processing heart beat for " + attemptId, - ex); + LOG.warn("IO Error occurred while processing heart beat for " + + applicationId, ex); } catch (Throwable ex) { LOG.warn( - "Error occurred while processing heart beat for " + attemptId, + "Error occurred while processing heart beat for " + applicationId, ex); } } LOG.info("UnmanagedApplicationManager has been stopped for {}. " - + "AMRequestHandlerThread thread is exiting", attemptId); + + "AMRequestHandlerThread thread is exiting", applicationId); } } @@ -600,8 +644,8 @@ public class UnmanagedApplicationManager { implements UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { - LOG.error("Heartbeat thread {} for application attempt {} crashed!", - t.getName(), attemptId, e); + LOG.error("Heartbeat thread {} for application {} crashed!", + t.getName(), applicationId, e); } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java index 9f15d905eb6..e1f08e3e1b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -63,16 +63,16 @@ public final class AMRMClientUtils { /** * Handle ApplicationNotRegistered exception and re-register. * - * @param attemptId app attemptId + * @param appId application Id * @param rmProxy RM proxy instance * @param registerRequest the AM re-register request * @throws YarnException if re-register fails */ public static void handleNotRegisteredExceptionAndReRegister( - ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy, + ApplicationId appId, ApplicationMasterProtocol rmProxy, RegisterApplicationMasterRequest registerRequest) throws YarnException { LOG.info("App attempt {} not registered, most likely due to RM failover. " - + " Trying to re-register.", attemptId); + + " Trying to re-register.", appId); try { rmProxy.registerApplicationMaster(registerRequest); } catch (Exception e) { @@ -93,25 +93,24 @@ public final class AMRMClientUtils { * @param request allocate request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return allocate response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static AllocateResponse allocateWithReRegister(AllocateRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.allocate(request); } catch (ApplicationMasterNotRegisteredException e) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // reset responseId after re-register request.setResponseId(0); // retry allocate - return allocateWithReRegister(request, rmProxy, registerRequest, - attemptId); + return allocateWithReRegister(request, rmProxy, registerRequest, appId); } } @@ -123,23 +122,22 @@ public final class AMRMClientUtils { * @param request finishApplicationMaster request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return finishApplicationMaster response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static FinishApplicationMasterResponse finishAMWithReRegister( FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.finishApplicationMaster(request); } catch (ApplicationMasterNotRegisteredException ex) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // retry finishAM after re-register - return finishAMWithReRegister(request, rmProxy, registerRequest, - attemptId); + return finishAMWithReRegister(request, rmProxy, registerRequest, appId); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index c49d6e890db..c50999423d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -171,10 +172,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, LoggerFactory.getLogger(MockResourceManagerFacade.class); private HashSet applicationMap = new HashSet<>(); - private HashMap> applicationContainerIdMap = - new HashMap>(); - private HashMap allocatedContainerMap = - new HashMap(); + private HashSet keepContainerOnUams = new HashSet<>(); + private HashMap> + applicationContainerIdMap = new HashMap<>(); private AtomicInteger containerIndex = new AtomicInteger(0); private Configuration conf; private int subClusterId; @@ -215,7 +215,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, this.isRunning = mode; } - private static String getAppIdentifier() throws IOException { + private static ApplicationAttemptId getAppIdentifier() throws IOException { AMRMTokenIdentifier result = null; UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); Set tokenIds = remoteUgi.getTokenIdentifiers(); @@ -225,7 +225,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, break; } } - return result != null ? result.getApplicationAttemptId().toString() : ""; + return result != null ? result.getApplicationAttemptId() + : ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0); } private void validateRunning() throws ConnectException { @@ -240,19 +241,32 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, throws YarnException, IOException { validateRunning(); - - String amrmToken = getAppIdentifier(); - LOG.info("Registering application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Registering application attempt: " + attemptId); shouldReRegisterNext = false; + List containersFromPreviousAttempt = null; + synchronized (applicationContainerIdMap) { - if (applicationContainerIdMap.containsKey(amrmToken)) { - throw new InvalidApplicationMasterRequestException( - AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + if (applicationContainerIdMap.containsKey(attemptId)) { + if (keepContainerOnUams.contains(attemptId.getApplicationId())) { + // For UAM with the keepContainersFromPreviousAttempt flag, return all + // running containers + containersFromPreviousAttempt = new ArrayList<>(); + for (ContainerId containerId : applicationContainerIdMap + .get(attemptId)) { + containersFromPreviousAttempt.add(Container.newInstance(containerId, + null, null, null, null, null)); + } + } else { + throw new InvalidApplicationMasterRequestException( + AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + } + } else { + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(attemptId, new ArrayList()); } - // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(amrmToken, new ArrayList()); } // Make sure we wait for certain test cases last in the method @@ -272,7 +286,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } return RegisterApplicationMasterResponse.newInstance(null, null, null, null, - null, request.getHost(), null); + containersFromPreviousAttempt, request.getHost(), null); } @Override @@ -282,8 +296,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); - String amrmToken = getAppIdentifier(); - LOG.info("Finishing application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Finishing application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -293,12 +307,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, synchronized (applicationContainerIdMap) { // Remove the containers that were being tracked for this application - Assert.assertTrue("The application id is NOT registered: " + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.remove(amrmToken); - for (ContainerId c : ids) { - allocatedContainerMap.remove(c); - } + Assert.assertTrue("The application id is NOT registered: " + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + applicationContainerIdMap.remove(attemptId); } return FinishApplicationMasterResponse.newInstance( @@ -328,8 +339,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, + "askList and releaseList in the same heartbeat"); } - String amrmToken = getAppIdentifier(); - LOG.info("Allocate from application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Allocate from application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -361,16 +372,16 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, // will need it in future Assert.assertTrue( "The application id is Not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); ids.add(containerId); - this.allocatedContainerMap.put(containerId, container); } } } } + List completedList = new ArrayList<>(); if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); @@ -378,9 +389,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, Assert .assertTrue( "The application id is not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); for (ContainerId id : request.getReleaseList()) { boolean found = false; @@ -396,18 +407,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, + conf.get("AMRMTOKEN"), found); ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = ContainerId.newInstance( - getApplicationAttemptId(1), containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); + completedList.add( + ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0)); } } } @@ -418,8 +419,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, - new ArrayList(), containerList, + return AllocateResponse.newInstance(0, completedList, containerList, new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, new ArrayList(), newAMRMToken, new ArrayList(), null); @@ -438,6 +438,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, report.setApplicationId(request.getApplicationId()); report.setCurrentApplicationAttemptId( ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], "")); response.setApplicationReport(report); return response; } @@ -481,6 +482,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } LOG.info("Application submitted: " + appId); applicationMap.add(appId); + + if (request.getApplicationSubmissionContext().getUnmanagedAM() + || request.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + keepContainerOnUams.add(appId); + } return SubmitApplicationResponse.newInstance(); } @@ -497,6 +504,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, throw new ApplicationNotFoundException( "Trying to kill an absent application: " + appId); } + keepContainerOnUams.remove(appId); } LOG.info("Force killing application: " + appId); return KillApplicationResponse.newInstance(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java new file mode 100644 index 00000000000..42be851512a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for FederationRegistryClient. + */ +public class TestFederationRegistryClient { + private Configuration conf; + private UserGroupInformation user; + private RegistryOperations registry; + private FederationRegistryClient registryClient; + + @Before + public void setup() throws Exception { + this.conf = new YarnConfiguration(); + + this.registry = new FSRegistryOperationsService(); + this.registry.init(this.conf); + this.registry.start(); + + this.user = UserGroupInformation.getCurrentUser(); + this.registryClient = + new FederationRegistryClient(this.conf, this.registry, this.user); + this.registryClient.cleanAllApplications(); + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + } + + @After + public void breakDown() { + registryClient.cleanAllApplications(); + Assert.assertEquals(0, registryClient.getAllApplications().size()); + registry.stop(); + } + + @Test + public void testBasicCase() { + ApplicationId appId = ApplicationId.newInstance(0, 0); + String scId1 = "subcluster1"; + String scId2 = "subcluster2"; + + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token()); + this.registryClient.writeAMRMTokenForUAM(appId, scId2, + new Token()); + // Duplicate entry, should overwrite + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token()); + + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + Assert.assertEquals(2, + this.registryClient.loadStateFromRegistry(appId).size()); + + this.registryClient.removeAppFromRegistry(appId); + + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + Assert.assertEquals(0, + this.registryClient.loadStateFromRegistry(appId).size()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 9159cf75150..5848d3f8b74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -65,7 +65,7 @@ public class TestUnmanagedApplicationManager { ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); uam = new TestableUnmanagedApplicationManager(conf, - attemptId.getApplicationId(), null, "submitter", "appNameSuffix"); + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); } protected void waitForCallBackCountAndCheckZeroPending( @@ -88,7 +88,8 @@ public class TestUnmanagedApplicationManager { public void testBasicUsage() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, @@ -102,11 +103,48 @@ public class TestUnmanagedApplicationManager { attemptId); } + /* + * Test re-attaching of an existing UAM. This is for HA of UAM client. + */ + @Test(timeout = 5000) + public void testUAMReAttach() + throws YarnException, IOException, InterruptedException { + + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + MockResourceManagerFacade rmProxy = uam.getRMProxy(); + uam = new TestableUnmanagedApplicationManager(conf, + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); + uam.setRMProxy(rmProxy); + + reAttachUAM(null, attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 2); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + @Test(timeout = 5000) public void testReRegister() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.setShouldReRegisterNext(); @@ -137,7 +175,8 @@ public class TestUnmanagedApplicationManager { @Override public void run() { try { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null), attemptId); } catch (Exception e) { @@ -221,7 +260,8 @@ public class TestUnmanagedApplicationManager { @Test public void testForceKill() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.forceKillApplication(); @@ -241,19 +281,40 @@ public class TestUnmanagedApplicationManager { return ugi; } - protected RegisterApplicationMasterResponse - createAndRegisterApplicationMaster( - final RegisterApplicationMasterRequest request, - ApplicationAttemptId appAttemptId) - throws YarnException, IOException, InterruptedException { + protected Token launchUAM( + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + return getUGIWithToken(appAttemptId) + .doAs(new PrivilegedExceptionAction>() { + @Override + public Token run() throws Exception { + return uam.launchUAM(); + } + }); + } + + protected void reAttachUAM(final Token uamToken, + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction() { + @Override + public Token run() throws Exception { + uam.reAttachUAM(uamToken); + return null; + } + }); + } + + protected RegisterApplicationMasterResponse registerApplicationMaster( + final RegisterApplicationMasterRequest request, + ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { return getUGIWithToken(appAttemptId).doAs( new PrivilegedExceptionAction() { @Override public RegisterApplicationMasterResponse run() throws YarnException, IOException { - RegisterApplicationMasterResponse response = - uam.createAndRegisterApplicationMaster(request); - return response; + return uam.registerApplicationMaster(request); } }); } @@ -311,8 +372,9 @@ public class TestUnmanagedApplicationManager { public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { - super(conf, appId, queueName, submitter, appNameSuffix); + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + super(conf, appId, queueName, submitter, appNameSuffix, + keepContainersAcrossApplicationAttempts); } @SuppressWarnings("unchecked") @@ -330,6 +392,14 @@ public class TestUnmanagedApplicationManager { rmProxy.setShouldReRegisterNext(); } } + + public MockResourceManagerFacade getRMProxy() { + return rmProxy; + } + + public void setRMProxy(MockResourceManagerFacade proxy) { + this.rmProxy = proxy; + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java index c355a8b4f42..92afcb7ad12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -67,4 +69,18 @@ public interface AMRMProxyApplicationContext { */ Context getNMCotext(); + /** + * Gets the credentials of this application. + * + * @return the credentials. + */ + Credentials getCredentials(); + + /** + * Gets the registry client. + * + * @return the registry. + */ + RegistryOperations getRegistryClient(); + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java index 9938b370b24..8a0209507b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -42,6 +44,8 @@ public class AMRMProxyApplicationContextImpl implements private Integer localTokenKeyId; private Token amrmToken; private Token localToken; + private Credentials credentials; + private RegistryOperations registry; /** * Create an instance of the AMRMProxyApplicationContext. @@ -52,17 +56,23 @@ public class AMRMProxyApplicationContextImpl implements * @param user user name of the application * @param amrmToken amrmToken issued by RM * @param localToken amrmToken issued by AMRMProxy + * @param credentials application credentials + * @param registry Yarn Registry client */ - public AMRMProxyApplicationContextImpl(Context nmContext, - Configuration conf, ApplicationAttemptId applicationAttemptId, - String user, Token amrmToken, - Token localToken) { + @SuppressWarnings("checkstyle:parameternumber") + public AMRMProxyApplicationContextImpl(Context nmContext, Configuration conf, + ApplicationAttemptId applicationAttemptId, String user, + Token amrmToken, + Token localToken, Credentials credentials, + RegistryOperations registry) { this.nmContext = nmContext; this.conf = conf; this.applicationAttemptId = applicationAttemptId; this.user = user; this.amrmToken = amrmToken; this.localToken = localToken; + this.credentials = credentials; + this.registry = registry; } @Override @@ -88,11 +98,14 @@ public class AMRMProxyApplicationContextImpl implements /** * Sets the application's AMRMToken. * - * @param amrmToken amrmToken issued by RM + * @param amrmToken the new amrmToken from RM + * @return whether the saved token is updated to a different value */ - public synchronized void setAMRMToken( + public synchronized boolean setAMRMToken( Token amrmToken) { + Token oldValue = this.amrmToken; this.amrmToken = amrmToken; + return !this.amrmToken.equals(oldValue); } @Override @@ -134,4 +147,14 @@ public class AMRMProxyApplicationContextImpl implements public Context getNMCotext() { return nmContext; } + + @Override + public Credentials getCredentials() { + return this.credentials; + } + + @Override + public RegistryOperations getRegistryClient() { + return this.registry; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index d63b2cf589b..ebd85bf44f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -34,12 +34,13 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -60,15 +61,19 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +87,7 @@ import com.google.common.base.Preconditions; * pipeline is a chain of interceptor instances that can inspect and modify the * request/response as needed. */ -public class AMRMProxyService extends AbstractService implements +public class AMRMProxyService extends CompositeService implements ApplicationMasterProtocol { private static final Logger LOG = LoggerFactory .getLogger(AMRMProxyService.class); @@ -96,6 +101,7 @@ public class AMRMProxyService extends AbstractService implements private InetSocketAddress listenerEndpoint; private AMRMProxyTokenSecretManager secretManager; private Map applPipelineMap; + private RegistryOperations registry; /** * Creates an instance of the service. @@ -118,10 +124,23 @@ public class AMRMProxyService extends AbstractService implements @Override protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); this.secretManager = new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore()); this.secretManager.init(conf); + + // Both second app attempt and NM restart within Federation need registry + if (conf.getBoolean(YarnConfiguration.AMRM_PROXY_HA_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED) + || conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) { + this.registry = FederationStateStoreFacade.createInstance(conf, + YarnConfiguration.YARN_REGISTRY_CLASS, + YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS, + RegistryOperations.class); + addService(this.registry); + } + + super.serviceInit(conf); } @Override @@ -203,6 +222,8 @@ public class AMRMProxyService extends AbstractService implements amrmToken = new Token<>(); amrmToken.decodeFromUrlString( new String(contextEntry.getValue(), "UTF-8")); + // Clear the service field, as if RM just issued the token + amrmToken.setService(new Text()); } } @@ -214,12 +235,36 @@ public class AMRMProxyService extends AbstractService implements throw new IOException("No user found for app attempt " + attemptId); } + // Regenerate the local AMRMToken for the AM Token localToken = this.secretManager.createAndGetAMRMToken(attemptId); + // Retrieve the AM container credentials from NM context + Credentials amCred = null; + for (Container container : this.nmContext.getContainers().values()) { + LOG.debug("From NM Context container " + container.getContainerId()); + if (container.getContainerId().getApplicationAttemptId().equals( + attemptId) && container.getContainerTokenIdentifier() != null) { + LOG.debug("Container type " + + container.getContainerTokenIdentifier().getContainerType()); + if (container.getContainerTokenIdentifier() + .getContainerType() == ContainerType.APPLICATION_MASTER) { + LOG.info("AM container {} found in context, has credentials: {}", + container.getContainerId(), + (container.getCredentials() != null)); + amCred = container.getCredentials(); + } + } + } + if (amCred == null) { + LOG.error("No credentials found for AM container of {}. " + + "Yarn registry access might not work", attemptId); + } + + // Create the intercepter pipeline for the AM initializePipeline(attemptId, user, amrmToken, localToken, - entry.getValue(), true); - } catch (Exception e) { + entry.getValue(), true, amCred); + } catch (IOException e) { LOG.error("Exception when recovering " + attemptId + ", removing it from NMStateStore and move on", e); this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId); @@ -326,7 +371,7 @@ public class AMRMProxyService extends AbstractService implements initializePipeline(appAttemptId, containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken, - localToken, null, false); + localToken, null, false, credentials); } /** @@ -342,7 +387,8 @@ public class AMRMProxyService extends AbstractService implements protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, Token localToken, - Map recoveredDataMap, boolean isRecovery) { + Map recoveredDataMap, boolean isRecovery, + Credentials credentials) { RequestInterceptorChainWrapper chainWrapper = null; synchronized (applPipelineMap) { if (applPipelineMap @@ -404,8 +450,9 @@ public class AMRMProxyService extends AbstractService implements try { RequestInterceptor interceptorChain = this.createRequestInterceptorChain(); - interceptorChain.init(createApplicationMasterContext(this.nmContext, - applicationAttemptId, user, amrmToken, localToken)); + interceptorChain.init( + createApplicationMasterContext(this.nmContext, applicationAttemptId, + user, amrmToken, localToken, credentials, this.registry)); if (isRecovery) { if (recoveredDataMap == null) { throw new YarnRuntimeException( @@ -497,14 +544,12 @@ public class AMRMProxyService extends AbstractService implements allocateResponse.setAMRMToken(null); org.apache.hadoop.security.token.Token newToken = - new org.apache.hadoop.security.token.Token( - token.getIdentifier().array(), token.getPassword().array(), - new Text(token.getKind()), new Text(token.getService())); + ConverterUtils.convertFromYarn(token, (Text) null); - context.setAMRMToken(newToken); - - // Update the AMRMToken in context map in NM state store - if (this.nmContext.getNMStateStore() != null) { + // Update the AMRMToken in context map, and in NM state store if it is + // different + if (context.setAMRMToken(newToken) + && this.nmContext.getNMStateStore() != null) { try { this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY, @@ -547,10 +592,12 @@ public class AMRMProxyService extends AbstractService implements private AMRMProxyApplicationContext createApplicationMasterContext( Context context, ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, - Token localToken) { + Token localToken, Credentials credentials, + RegistryOperations registryImpl) { AMRMProxyApplicationContextImpl appContext = new AMRMProxyApplicationContextImpl(context, getConfig(), - applicationAttemptId, user, amrmToken, localToken); + applicationAttemptId, user, amrmToken, localToken, credentials, + registryImpl); return appContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 33cfca355b6..ef5e0619240 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -34,6 +34,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -42,6 +44,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -56,17 +59,20 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +151,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private UserGroupInformation appOwner; + private FederationRegistryClient registryClient; + /** * Creates an instance of the FederationInterceptor class. */ @@ -179,6 +187,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } catch (Exception ex) { throw new YarnRuntimeException(ex); } + // Add all app tokens for Yarn Registry access + if (this.registryClient != null && appContext.getCredentials() != null) { + this.appOwner.addCredentials(appContext.getCredentials()); + } this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); @@ -192,6 +204,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.uamPool.init(conf); this.uamPool.start(); + + if (appContext.getRegistryClient() != null) { + this.registryClient = new FederationRegistryClient(conf, + appContext.getRegistryClient(), this.appOwner); + } } /** @@ -250,20 +267,27 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ this.amRegistrationResponse = this.homeRM.registerApplicationMaster(request); + if (this.amRegistrationResponse + .getContainersFromPreviousAttempts() != null) { + cacheAllocatedContainers( + this.amRegistrationResponse.getContainersFromPreviousAttempts(), + this.homeSubClusterId); + } + + ApplicationId appId = + getApplicationContext().getApplicationAttemptId().getApplicationId(); + reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId); // the queue this application belongs will be used for getting // AMRMProxy policy from state store. String queue = this.amRegistrationResponse.getQueue(); if (queue == null) { - LOG.warn("Received null queue for application " - + getApplicationContext().getApplicationAttemptId().getApplicationId() - + " from home sub-cluster. Will use default queue name " + LOG.warn("Received null queue for application " + appId + + " from home subcluster. Will use default queue name " + YarnConfiguration.DEFAULT_QUEUE_NAME + " for getting AMRMProxyPolicy"); } else { - LOG.info("Application " - + getApplicationContext().getApplicationAttemptId().getApplicationId() - + " belongs to queue " + queue); + LOG.info("Application " + appId + " belongs to queue " + queue); } // Initialize the AMRMProxyPolicy @@ -304,7 +328,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister( requests.get(this.homeSubClusterId), this.homeRM, this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId()); + getApplicationContext().getApplicationAttemptId().getApplicationId()); // Notify policy of home response try { @@ -393,8 +417,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // request to the home resource manager on this thread. FinishApplicationMasterResponse homeResponse = AMRMClientUtils.finishAMWithReRegister(request, this.homeRM, - this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId()); + this.amRegistrationRequest, getApplicationContext() + .getApplicationAttemptId().getApplicationId()); if (subClusterIds.size() > 0) { // Wait for other sub-cluster resource managers to return the @@ -425,6 +449,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (failedToUnRegister) { homeResponse.setIsUnregistered(false); + } else { + // Clean up UAMs only when the app finishes successfully, so that no more + // attempt will be launched. + this.uamPool.stop(); + if (this.registryClient != null) { + this.registryClient.removeAppFromRegistry(getApplicationContext() + .getApplicationAttemptId().getApplicationId()); + } } return homeResponse; } @@ -442,9 +474,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ @Override public void shutdown() { - if (this.uamPool != null) { - this.uamPool.stop(); - } + // Do not stop uamPool service and kill UAMs here because of possible second + // app attempt if (threadpool != null) { try { threadpool.shutdown(); @@ -455,6 +486,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { super.shutdown(); } + /** + * Only for unit test cleanup. + */ + @VisibleForTesting + protected void cleanupRegistry() { + if (this.registryClient != null) { + this.registryClient.cleanAllApplications(); + } + } + /** * Create the UAM pool manager for secondary sub-clsuters. For unit test to * override. @@ -486,6 +527,120 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } + private void mergeRegisterResponse( + RegisterApplicationMasterResponse homeResponse, + RegisterApplicationMasterResponse otherResponse) { + + if (!isNullOrEmpty(otherResponse.getContainersFromPreviousAttempts())) { + if (!isNullOrEmpty(homeResponse.getContainersFromPreviousAttempts())) { + homeResponse.getContainersFromPreviousAttempts() + .addAll(otherResponse.getContainersFromPreviousAttempts()); + } else { + homeResponse.setContainersFromPreviousAttempts( + otherResponse.getContainersFromPreviousAttempts()); + } + } + + if (!isNullOrEmpty(otherResponse.getNMTokensFromPreviousAttempts())) { + if (!isNullOrEmpty(homeResponse.getNMTokensFromPreviousAttempts())) { + homeResponse.getNMTokensFromPreviousAttempts() + .addAll(otherResponse.getNMTokensFromPreviousAttempts()); + } else { + homeResponse.setNMTokensFromPreviousAttempts( + otherResponse.getNMTokensFromPreviousAttempts()); + } + } + } + + /** + * Try re-attach to all existing and running UAMs in secondary sub-clusters + * launched by previous application attempts if any. All running containers in + * the UAMs will be combined into the registerResponse. For the first attempt, + * the registry will be empty for this application and thus no-op here. + */ + protected void reAttachUAMAndMergeRegisterResponse( + RegisterApplicationMasterResponse homeResponse, + final ApplicationId appId) { + + if (this.registryClient == null) { + // Both AMRMProxy HA and NM work preserving restart is not enabled + LOG.warn("registryClient is null, skip attaching existing UAM if any"); + return; + } + + // Load existing running UAMs from the previous attempts from + // registry, if any + Map> uamMap = + this.registryClient.loadStateFromRegistry(appId); + if (uamMap.size() == 0) { + LOG.info("No existing UAM for application {} found in Yarn Registry", + appId); + return; + } + LOG.info("Found {} existing UAMs for application {} in Yarn Registry. " + + "Reattaching in parallel", uamMap.size(), appId); + + ExecutorCompletionService + completionService = new ExecutorCompletionService<>(threadpool); + + for (Entry> entry : uamMap.entrySet()) { + final SubClusterId subClusterId = + SubClusterId.newInstance(entry.getKey()); + final Token amrmToken = entry.getValue(); + + completionService + .submit(new Callable() { + @Override + public RegisterApplicationMasterResponse call() throws Exception { + RegisterApplicationMasterResponse response = null; + try { + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId.getId()); + + uamPool.reAttachUAM(subClusterId.getId(), config, appId, + amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), homeSubClusterId.getId(), + amrmToken); + + response = uamPool.registerApplicationMaster( + subClusterId.getId(), amRegistrationRequest); + + if (response != null + && response.getContainersFromPreviousAttempts() != null) { + cacheAllocatedContainers( + response.getContainersFromPreviousAttempts(), + subClusterId); + } + LOG.info("UAM {} reattached for {}", subClusterId, appId); + } catch (Throwable e) { + LOG.error( + "Reattaching UAM " + subClusterId + " failed for " + appId, + e); + } + return response; + } + }); + } + + // Wait for the re-attach responses + for (int i = 0; i < uamMap.size(); i++) { + try { + Future future = + completionService.take(); + RegisterApplicationMasterResponse registerResponse = future.get(); + if (registerResponse != null) { + LOG.info("Merging register response for {}", appId); + mergeRegisterResponse(homeResponse, registerResponse); + } + } catch (Exception e) { + LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e); + } + } + } + private SubClusterId getSubClusterForNode(String nodeName) { SubClusterId subClusterId = null; try { @@ -655,6 +810,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor { responses.add(response); } + // Save the new AMRMToken for the UAM in registry if present + if (response.getAMRMToken() != null) { + Token newToken = ConverterUtils + .convertFromYarn(response.getAMRMToken(), (Text) null); + // Update the token in registry + if (registryClient != null) { + registryClient + .writeAMRMTokenForUAM( + getApplicationContext().getApplicationAttemptId() + .getApplicationId(), + subClusterId.getId(), newToken); + } + } + // Notify policy of secondary sub-cluster responses try { policyInterpreter.notifyOfResponse(subClusterId, response); @@ -714,20 +883,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor { subClusterId); RegisterApplicationMasterResponse uamResponse = null; + Token token = null; try { // For appNameSuffix, use subClusterId of the home sub-cluster - uamResponse = uamPool.createAndRegisterNewUAM(subClusterId, - registerRequest, config, + token = uamPool.launchUAM(subClusterId, config, appContext.getApplicationAttemptId().getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString()); + homeSubClusterId.toString(), registryClient != null); + + uamResponse = uamPool.registerApplicationMaster(subClusterId, + registerRequest); } catch (Throwable e) { LOG.error("Failed to register application master: " + subClusterId + " Application: " + appContext.getApplicationAttemptId(), e); } return new RegisterApplicationMasterResponseInfo(uamResponse, - SubClusterId.newInstance(subClusterId)); + SubClusterId.newInstance(subClusterId), token); } }); } @@ -752,6 +924,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { + getApplicationContext().getApplicationAttemptId()); successfulRegistrations.put(uamResponse.getSubClusterId(), uamResponse.getResponse()); + + if (registryClient != null) { + registryClient.writeAMRMTokenForUAM( + getApplicationContext().getApplicationAttemptId() + .getApplicationId(), + uamResponse.getSubClusterId().getId(), + uamResponse.getUamToken()); + } } } catch (Exception e) { LOG.warn("Failed to register unmanaged application master: " @@ -1087,11 +1267,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private static class RegisterApplicationMasterResponseInfo { private RegisterApplicationMasterResponse response; private SubClusterId subClusterId; + private Token uamToken; RegisterApplicationMasterResponseInfo( - RegisterApplicationMasterResponse response, SubClusterId subClusterId) { + RegisterApplicationMasterResponse response, SubClusterId subClusterId, + Token uamToken) { this.response = response; this.subClusterId = subClusterId; + this.uamToken = uamToken; } public RegisterApplicationMasterResponse getResponse() { @@ -1101,6 +1284,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public SubClusterId getSubClusterId() { return subClusterId; } + + public Token getUamToken() { + return uamToken; + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 6f993a431b8..de3db6e70a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -353,10 +353,6 @@ public class ContainerManagerImpl extends CompositeService implements rsrcLocalizationSrvc.recoverLocalizedResources( stateStore.loadLocalizationState()); - if (this.amrmProxyEnabled) { - this.getAMRMProxyService().recover(); - } - RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); for (ContainerManagerApplicationProto proto : appsState.getApplications()) { @@ -373,6 +369,11 @@ public class ContainerManagerImpl extends CompositeService implements recoverContainer(rcs); } + // Recovery AMRMProxy state after apps and containers are recovered + if (this.amrmProxyEnabled) { + this.getAMRMProxyService().recover(); + } + //Dispatching the RECOVERY_COMPLETED event through the dispatcher //so that all the paused, scheduled and queued containers will //be scheduled for execution on availability of resources. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 3afbce76d56..8fa2056eb12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -179,6 +180,15 @@ public abstract class BaseAMRMProxyTest { return new NMContext(null, null, null, null, stateStore, false, this.conf); } + protected List getCompletedContainerIds( + List containerStatus) { + List ret = new ArrayList<>(); + for (ContainerStatus status : containerStatus) { + ret.add(status.getContainerId()); + } + return ret; + } + /** * This helper method will invoke the specified function in parallel for each * end point in the specified list using a thread pool and return the @@ -623,7 +633,7 @@ public abstract class BaseAMRMProxyTest { */ public void initApp(ApplicationAttemptId applicationId, String user) { super.initializePipeline(applicationId, user, - new Token(), null, null, false); + new Token(), null, null, false, null); } public void stopApp(ApplicationId applicationId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 937ede59d7a..b955311bb9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -444,7 +444,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2); getAMRMProxyService().initializePipeline(applicationAttemptId, user, - new Token(), null, null, false); + new Token(), null, null, false, null); RequestInterceptorChainWrapper chain2 = getAMRMProxyService().getPipelines().get(appId); @@ -531,16 +531,14 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - // The way the mock resource manager is setup, it will return the containers - // that were released in the response. This is done because the UAMs run - // asynchronously and we need to if all the resource managers received the - // release it. The containers sent by the mock resource managers will be + // We need to make sure all the resource managers received the + // release list. The containers sent by the mock resource managers will be // aggregated and returned back to us and we can assert if all the release // lists reached the sub-clusters - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + List containersForReleasedContainerIds = new ArrayList<>(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -554,8 +552,9 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " + Integer.toString(allocateResponse.getAllocatedContainers() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 3db0e351e0e..aa7ed697b87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -19,16 +19,20 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -59,6 +63,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -79,7 +87,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { private TestableFederationInterceptor interceptor; private MemoryFederationStateStore stateStore; + private NMStateStoreService nmStateStore; + private RegistryOperations registry; + private Context nmContext; private int testAppId; private ApplicationAttemptId attemptId; @@ -93,15 +104,28 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); + nmStateStore = new NMMemoryStateStoreService(); + nmStateStore.init(getConf()); + nmStateStore.start(); + + registry = new FSRegistryOperationsService(); + registry.init(getConf()); + registry.start(); + testAppId = 1; attemptId = getApplicationAttemptId(testAppId); - interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), - attemptId, "test-user", null, null)); + nmContext = + new NMContext(null, null, null, null, nmStateStore, false, getConf()); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), + attemptId, "test-user", null, null, null, registry)); + interceptor.cleanupRegistry(); } @Override public void tearDown() { + interceptor.cleanupRegistry(); interceptor.shutdown(); + registry.stop(); super.tearDown(); } @@ -207,18 +231,17 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); - // The way the mock resource manager is setup, it will return the containers - // that were released in the allocated containers. The release request will - // be split and handled by the corresponding UAM. The release containers - // returned by the mock resource managers will be aggregated and returned - // back to us and we can check if total request size and returned size are - // the same - List containersForReleasedContainerIds = - new ArrayList(); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + // The release request will be split and handled by the corresponding UAM. + // The release containers returned by the mock resource managers will be + // aggregated and returned back to us and we can check if total request size + // and returned size are the same + List containersForReleasedContainerIds = + new ArrayList(); + List newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in the original request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -228,11 +251,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { allocateResponse = interceptor.allocate(Records.newRecord(AllocateRequest.class)); Assert.assertNotNull(allocateResponse); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); LOG.info("Total number of containers received: " + Integer.toString(containersForReleasedContainerIds.size())); Thread.sleep(10); @@ -547,4 +571,74 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { Assert.assertEquals(1, response.getUpdatedContainers().size()); Assert.assertEquals(1, response.getUpdateErrors().size()); } + + @Test + public void testSecondAttempt() throws Exception { + ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); + userInfo.getUser().doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + for (Container c : containers) { + System.out.println(c.getId() + " ha"); + } + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Preserve the mock RM instances for secondaries + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Increase the attemptId and create a new intercepter instance for it + attemptId = ApplicationAttemptId.newInstance( + attemptId.getApplicationId(), attemptId.getAttemptId() + 1); + + interceptor = new TestableFederationInterceptor(null, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registry)); + registerResponse = interceptor.registerApplicationMaster(registerReq); + + // Should re-attach secondaries and get the three running containers + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(numberOfContainers, + registerResponse.getContainersFromPreviousAttempts().size()); + + // Release all containers + releaseContainersAndAssert( + registerResponse.getContainersFromPreviousAttempts()); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + return null; + } + }); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index d4b8735d464..23c80ae9090 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -44,6 +44,15 @@ public class TestableFederationInterceptor extends FederationInterceptor { private AtomicInteger runningIndex = new AtomicInteger(0); private MockResourceManagerFacade mockRm; + public TestableFederationInterceptor() { + } + + public TestableFederationInterceptor(MockResourceManagerFacade homeRM, + ConcurrentHashMap secondaries) { + mockRm = homeRM; + secondaryResourceManagers = secondaries; + } + @Override protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( ExecutorService threadPool) { @@ -68,7 +77,7 @@ public class TestableFederationInterceptor extends FederationInterceptor { // We create one instance of the mock resource manager per sub cluster. Keep // track of the instances of the RMs in the map keyed by the sub cluster id synchronized (this.secondaryResourceManagers) { - if (this.secondaryResourceManagers.contains(subClusterId)) { + if (this.secondaryResourceManagers.containsKey(subClusterId)) { return (T) this.secondaryResourceManagers.get(subClusterId); } else { // The running index here is used to simulate different RM_EPOCH to @@ -91,6 +100,15 @@ public class TestableFederationInterceptor extends FederationInterceptor { } } + protected MockResourceManagerFacade getHomeRM() { + return mockRm; + } + + protected ConcurrentHashMap + getSecondaryRMs() { + return secondaryResourceManagers; + } + /** * Extends the UnmanagedAMPoolManager and overrides methods to provide a * testable implementation of UnmanagedAMPoolManager. @@ -104,9 +122,9 @@ public class TestableFederationInterceptor extends FederationInterceptor { @Override public UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { return new TestableUnmanagedApplicationManager(conf, appId, queueName, - submitter, appNameSuffix); + submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); } } @@ -119,8 +137,9 @@ public class TestableFederationInterceptor extends FederationInterceptor { public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { - super(conf, appId, queueName, submitter, appNameSuffix); + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + super(conf, appId, queueName, submitter, appNameSuffix, + keepContainersAcrossApplicationAttempts); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index de282fd0631..9834b3a6c25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.service.AbstractService; @@ -951,9 +952,10 @@ public class MiniYARNCluster extends CompositeService { protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, Token localToken, - Map recoveredDataMap, boolean isRecovery) { + Map recoveredDataMap, boolean isRecovery, + Credentials credentials) { super.initializePipeline(applicationAttemptId, user, amrmToken, - localToken, recoveredDataMap, isRecovery); + localToken, recoveredDataMap, isRecovery, credentials); RequestInterceptor rt = getPipelines() .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); // The DefaultRequestInterceptor will generally be the last diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md index ef0f7131fd8..40d46cb0e65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md @@ -141,6 +141,8 @@ The figure shows a sequence diagram for the following job execution flow: b. The security tokens are also modified by the NM when launching the AM, so that the AM can only talk with the AMRMProxy. Any future communication from AM to the YARN RM is mediated by the AMRMProxy. 7. The AM will then request containers using the locality information exposed by HDFS. 8. Based on a policy the AMRMProxy can impersonate the AM on other sub-clusters, by submitting an Unmanaged AM, and by forwarding the AM heartbeats to relevant sub-clusters. + a. Federation supports multiple application attempts with AMRMProxy HA. AM containers will have different attempt id in home sub-cluster, but the same Unmanaged AM in secondaries will be used across attempts. + b. When AMRMProxy HA is enabled, UAM token will be stored in Yarn Registry. In the registerApplicationMaster call of each application attempt, AMRMProxy will go fetch existing UAM tokens from registry (if any) and re-attached to the existing UAMs. 9. The AMRMProxy will use both locality information and a pluggable policy configured in the state-store to decide whether to forward the resource requests received by the AM to the Home RM or to one (or more) Secondary RMs. In Figure 1, we show the case in which the AMRMProxy decides to forward the request to the secondary RM. 10. The secondary RM will provide the AMRMProxy with valid container tokens to start a new container on some node in its sub-cluster. This mechanism ensures that each sub-cluster uses its own security tokens and avoids the need for a cluster wide shared secret to create tokens. 11. The AMRMProxy forwards the allocation response back to the AM. @@ -262,16 +264,17 @@ These are extra configurations that should appear in the **conf/yarn-site.xml** | Property | Example | Description | |:---- |:---- | -| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled. -|`yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor. +| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled. | +| `yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor. | | `yarn.client.failover-proxy-provider` | `org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider` | The class used to connect to the RMs by looking up the membership information in federation state-store. This must be set if federation is enabled, even if RM HA is not enabled.| Optional: | Property | Example | Description | |:---- |:---- | -|`yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. | -|`yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. | +| `yarn.nodemanager.amrmproxy.ha.enable` | `true` | Whether or not the AMRMProxy HA is enabled for multiple application attempt suppport. | +| `yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. | +| `yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. | Running a Sample Job --------------------