From 0ab1d2f25ba39391bf7bc5e486bf75c828a0e856 Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Tue, 27 Mar 2018 17:39:46 -0700 Subject: [PATCH] YARN-8010. Add config in FederationRMFailoverProxy to not bypass facade cache when failing over. (Botong Huang via Subru). (cherry picked from commit 2a2ef15caf791f30c471526c1b74e68803f0c405) --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 9 ++++++--- .../src/main/resources/yarn-default.xml | 9 +++++++++ .../failover/FederationRMFailoverProxyProvider.java | 11 ++++++++--- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7c25be3db67..ac7e1687348 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2806,15 +2806,18 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; + // 5 minutes + public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + + public static final String FEDERATION_FLUSh_CACHE_FOR_RM_ADDR = + FEDERATION_PREFIX + "flush-cache-for-rm-addr"; + public static final boolean DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR = true; public static final String FEDERATION_REGISTRY_BASE_KEY = FEDERATION_PREFIX + "registry.base-dir"; public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = "yarnfederation/"; - // 5 minutes - public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; - public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 24279f5ad7c..5225da27d1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2823,6 +2823,15 @@ 300 + + + Whether to flush FederationStateStoreFacade cache to get subcluster info + when FederationRMFailoverProxyProvider is performing failover. + + yarn.federation.flush-cache-for-rm-addr + true + + The registry base directory for federation. yarn.federation.registry.base-dir diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java index c631208b783..b72b1993594 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java @@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider private FederationStateStoreFacade facade; private SubClusterId subClusterId; private UserGroupInformation originalUser; - private boolean federationFailoverEnabled = false; + private boolean federationFailoverEnabled; + private boolean flushFacadeCacheForYarnRMAddr; @Override public void init(Configuration configuration, RMProxy proxy, @@ -75,13 +76,16 @@ public class FederationRMFailoverProxyProvider String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID); Preconditions.checkNotNull(clusterId, "Missing RM ClusterId"); this.subClusterId = SubClusterId.newInstance(clusterId); - this.facade = facade.getInstance(); + this.facade = FederationStateStoreFacade.getInstance(); if (configuration instanceof YarnConfiguration) { this.conf = (YarnConfiguration) configuration; } federationFailoverEnabled = conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); + flushFacadeCacheForYarnRMAddr = + conf.getBoolean(YarnConfiguration.FEDERATION_FLUSh_CACHE_FOR_RM_ADDR, + YarnConfiguration.DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR); conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, @@ -119,7 +123,8 @@ public class FederationRMFailoverProxyProvider try { LOG.info("Failing over to the ResourceManager for SubClusterId: {}", subClusterId); - subClusterInfo = facade.getSubCluster(subClusterId, isFailover); + subClusterInfo = facade.getSubCluster(subClusterId, + this.flushFacadeCacheForYarnRMAddr && isFailover); // updating the conf with the refreshed RM addresses as proxy // creations are based out of conf updateRMAddress(subClusterInfo);