YARN-8010. Add config in FederationRMFailoverProxy to not bypass facade cache when failing over. (Botong Huang via Subru).
(cherry picked from commit 2a2ef15caf791f30c471526c1b74e68803f0c405) (cherry picked from commit 85eebf1bebc7b191dcd692395f77903257cd85c4)
This commit is contained in:
parent
762126fac8
commit
276a5ca632
@ -3087,15 +3087,18 @@ public static boolean isAclEnabled(Configuration conf) {
|
||||
|
||||
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
|
||||
FEDERATION_PREFIX + "cache-ttl.secs";
|
||||
// 5 minutes
|
||||
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
|
||||
|
||||
public static final String FEDERATION_FLUSh_CACHE_FOR_RM_ADDR =
|
||||
FEDERATION_PREFIX + "flush-cache-for-rm-addr";
|
||||
public static final boolean DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR = true;
|
||||
|
||||
public static final String FEDERATION_REGISTRY_BASE_KEY =
|
||||
FEDERATION_PREFIX + "registry.base-dir";
|
||||
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
|
||||
"yarnfederation/";
|
||||
|
||||
// 5 minutes
|
||||
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
|
||||
|
||||
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
|
||||
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
|
||||
|
||||
|
@ -2924,6 +2924,15 @@
|
||||
<value>300</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Whether to flush FederationStateStoreFacade cache to get subcluster info
|
||||
when FederationRMFailoverProxyProvider is performing failover.
|
||||
</description>
|
||||
<name>yarn.federation.flush-cache-for-rm-addr</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The registry base directory for federation.</description>
|
||||
<name>yarn.federation.registry.base-dir</name>
|
||||
|
@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider<T>
|
||||
private FederationStateStoreFacade facade;
|
||||
private SubClusterId subClusterId;
|
||||
private UserGroupInformation originalUser;
|
||||
private boolean federationFailoverEnabled = false;
|
||||
private boolean federationFailoverEnabled;
|
||||
private boolean flushFacadeCacheForYarnRMAddr;
|
||||
|
||||
@Override
|
||||
public void init(Configuration configuration, RMProxy<T> proxy,
|
||||
@ -75,13 +76,16 @@ public void init(Configuration configuration, RMProxy<T> proxy,
|
||||
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
|
||||
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
|
||||
this.subClusterId = SubClusterId.newInstance(clusterId);
|
||||
this.facade = facade.getInstance();
|
||||
this.facade = FederationStateStoreFacade.getInstance();
|
||||
if (configuration instanceof YarnConfiguration) {
|
||||
this.conf = (YarnConfiguration) configuration;
|
||||
}
|
||||
federationFailoverEnabled =
|
||||
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
|
||||
flushFacadeCacheForYarnRMAddr =
|
||||
conf.getBoolean(YarnConfiguration.FEDERATION_FLUSh_CACHE_FOR_RM_ADDR,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR);
|
||||
|
||||
conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||
@ -119,7 +123,8 @@ private T getProxyInternal(boolean isFailover) {
|
||||
try {
|
||||
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
|
||||
subClusterId);
|
||||
subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
|
||||
subClusterInfo = facade.getSubCluster(subClusterId,
|
||||
this.flushFacadeCacheForYarnRMAddr && isFailover);
|
||||
// updating the conf with the refreshed RM addresses as proxy
|
||||
// creations are based out of conf
|
||||
updateRMAddress(subClusterInfo);
|
||||
|
Loading…
x
Reference in New Issue
Block a user