YARN-8010. Add config in FederationRMFailoverProxy to not bypass facade cache when failing over. (Botong Huang via Subru).
This commit is contained in:
parent
3fe41c65d8
commit
2a2ef15caf
|
@ -3089,15 +3089,18 @@ public class YarnConfiguration extends Configuration {
|
||||||
|
|
||||||
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
|
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
|
||||||
FEDERATION_PREFIX + "cache-ttl.secs";
|
FEDERATION_PREFIX + "cache-ttl.secs";
|
||||||
|
// 5 minutes
|
||||||
|
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
|
||||||
|
|
||||||
|
public static final String FEDERATION_FLUSh_CACHE_FOR_RM_ADDR =
|
||||||
|
FEDERATION_PREFIX + "flush-cache-for-rm-addr";
|
||||||
|
public static final boolean DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR = true;
|
||||||
|
|
||||||
public static final String FEDERATION_REGISTRY_BASE_KEY =
|
public static final String FEDERATION_REGISTRY_BASE_KEY =
|
||||||
FEDERATION_PREFIX + "registry.base-dir";
|
FEDERATION_PREFIX + "registry.base-dir";
|
||||||
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
|
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
|
||||||
"yarnfederation/";
|
"yarnfederation/";
|
||||||
|
|
||||||
// 5 minutes
|
|
||||||
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
|
|
||||||
|
|
||||||
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
|
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
|
||||||
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
|
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
|
||||||
|
|
||||||
|
|
|
@ -2924,6 +2924,15 @@
|
||||||
<value>300</value>
|
<value>300</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
Whether to flush FederationStateStoreFacade cache to get subcluster info
|
||||||
|
when FederationRMFailoverProxyProvider is performing failover.
|
||||||
|
</description>
|
||||||
|
<name>yarn.federation.flush-cache-for-rm-addr</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>The registry base directory for federation.</description>
|
<description>The registry base directory for federation.</description>
|
||||||
<name>yarn.federation.registry.base-dir</name>
|
<name>yarn.federation.registry.base-dir</name>
|
||||||
|
|
|
@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider<T>
|
||||||
private FederationStateStoreFacade facade;
|
private FederationStateStoreFacade facade;
|
||||||
private SubClusterId subClusterId;
|
private SubClusterId subClusterId;
|
||||||
private UserGroupInformation originalUser;
|
private UserGroupInformation originalUser;
|
||||||
private boolean federationFailoverEnabled = false;
|
private boolean federationFailoverEnabled;
|
||||||
|
private boolean flushFacadeCacheForYarnRMAddr;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration configuration, RMProxy<T> proxy,
|
public void init(Configuration configuration, RMProxy<T> proxy,
|
||||||
|
@ -75,13 +76,16 @@ public class FederationRMFailoverProxyProvider<T>
|
||||||
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
|
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
|
||||||
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
|
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
|
||||||
this.subClusterId = SubClusterId.newInstance(clusterId);
|
this.subClusterId = SubClusterId.newInstance(clusterId);
|
||||||
this.facade = facade.getInstance();
|
this.facade = FederationStateStoreFacade.getInstance();
|
||||||
if (configuration instanceof YarnConfiguration) {
|
if (configuration instanceof YarnConfiguration) {
|
||||||
this.conf = (YarnConfiguration) configuration;
|
this.conf = (YarnConfiguration) configuration;
|
||||||
}
|
}
|
||||||
federationFailoverEnabled =
|
federationFailoverEnabled =
|
||||||
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
|
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
|
||||||
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
|
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
|
||||||
|
flushFacadeCacheForYarnRMAddr =
|
||||||
|
conf.getBoolean(YarnConfiguration.FEDERATION_FLUSh_CACHE_FOR_RM_ADDR,
|
||||||
|
YarnConfiguration.DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR);
|
||||||
|
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||||
|
@ -119,7 +123,8 @@ public class FederationRMFailoverProxyProvider<T>
|
||||||
try {
|
try {
|
||||||
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
|
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
|
||||||
subClusterId);
|
subClusterId);
|
||||||
subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
|
subClusterInfo = facade.getSubCluster(subClusterId,
|
||||||
|
this.flushFacadeCacheForYarnRMAddr && isFailover);
|
||||||
// updating the conf with the refreshed RM addresses as proxy
|
// updating the conf with the refreshed RM addresses as proxy
|
||||||
// creations are based out of conf
|
// creations are based out of conf
|
||||||
updateRMAddress(subClusterInfo);
|
updateRMAddress(subClusterInfo);
|
||||||
|
|
Loading…
Reference in New Issue