YARN-8010. Add config in FederationRMFailoverProxy to not bypass facade cache when failing over. (Botong Huang via Subru).

(cherry picked from commit 09999d7e014fde717e8b122773b68664f4594106)
(cherry picked from commit a0091ec4b3c28a468f95d8f066a769320937ad6f)
This commit is contained in:
Subru Krishnan 2018-03-28 11:33:19 -07:00 committed by Wangda Tan
parent b56f657940
commit f39f395060
4 changed files with 94 additions and 9 deletions

View File

@ -3087,15 +3087,18 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR =
FEDERATION_PREFIX + "flush-cache-for-rm-addr";
public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true;
public static final String FEDERATION_REGISTRY_BASE_KEY =
FEDERATION_PREFIX + "registry.base-dir";
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
"yarnfederation/";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";

View File

@ -79,6 +79,8 @@ public void initializeMemberVariables() {
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR);
configurationPropsToSkipCompare
.add(YarnConfiguration.RM_EPOCH);
configurationPropsToSkipCompare

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
@ -49,7 +50,11 @@
import org.junit.Test;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@ -61,12 +66,20 @@ public class TestFederationRMFailoverProxyProvider {
private FederationStateStore stateStore;
private final String dummyCapability = "cap";
private GetClusterMetricsResponse threadResponse;
@Before
public void setUp() throws IOException, YarnException {
conf = new YarnConfiguration();
stateStore = new MemoryFederationStateStore();
// Configure Facade cache to use a very long ttl
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 60 * 60);
stateStore = spy(new MemoryFederationStateStore());
stateStore.init(conf);
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
verify(stateStore, times(0))
.getSubClusters(any(GetSubClustersInfoRequest.class));
}
@After
@ -75,12 +88,25 @@ public void tearDown() throws Exception {
stateStore = null;
}
@Test
@Test(timeout = 60000)
public void testFederationRMFailoverProxyProvider() throws Exception {
testProxyProvider(true);
}
@Test (timeout=60000)
public void testFederationRMFailoverProxyProviderWithoutFlushFacadeCache()
throws Exception {
testProxyProvider(false);
}
private void testProxyProvider(boolean facadeFlushCache) throws Exception {
final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
final MiniYARNCluster cluster = new MiniYARNCluster(
"testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
conf.setBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
facadeFlushCache);
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
@ -104,10 +130,16 @@ public void testFederationRMFailoverProxyProvider() throws Exception {
.createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
UserGroupInformation.getCurrentUser());
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
// client will retry until the rm becomes active.
GetClusterMetricsResponse response =
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
// validate response
checkResponse(response);
@ -118,7 +150,50 @@ public void testFederationRMFailoverProxyProvider() throws Exception {
// Transition rm2 to active;
makeRMActive(subClusterId, cluster, 1);
response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
threadResponse = null;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
// In non flush cache case, we will be hitting the cache with old RM
// address and keep failing before the cache is flushed
threadResponse =
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
} catch (YarnException | IOException e) {
e.printStackTrace();
}
}
});
thread.start();
if (!facadeFlushCache) {
// Add a wait so that hopefully the thread has started hitting old cached
Thread.sleep(500);
// Should still be hitting cache
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
// Force flush cache, so that it will pick up the new RM address
FederationStateStoreFacade.getInstance().getSubCluster(subClusterId,
true);
}
// Wait for the thread to finish and grab result
thread.join();
response = threadResponse;
if (facadeFlushCache) {
verify(stateStore, atLeast(2))
.getSubClusters(any(GetSubClustersInfoRequest.class));
} else {
verify(stateStore, times(2))
.getSubClusters(any(GetSubClustersInfoRequest.class));
}
// validate response
checkResponse(response);

View File

@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider<T>
private FederationStateStoreFacade facade;
private SubClusterId subClusterId;
private UserGroupInformation originalUser;
private boolean federationFailoverEnabled = false;
private boolean federationFailoverEnabled;
private boolean flushFacadeCacheForYarnRMAddr;
@Override
public void init(Configuration configuration, RMProxy<T> proxy,
@ -75,13 +76,16 @@ public void init(Configuration configuration, RMProxy<T> proxy,
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
this.subClusterId = SubClusterId.newInstance(clusterId);
this.facade = facade.getInstance();
this.facade = FederationStateStoreFacade.getInstance();
if (configuration instanceof YarnConfiguration) {
this.conf = (YarnConfiguration) configuration;
}
federationFailoverEnabled =
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
flushFacadeCacheForYarnRMAddr =
conf.getBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
YarnConfiguration.DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
@ -119,7 +123,8 @@ private T getProxyInternal(boolean isFailover) {
try {
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
subClusterId);
subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
subClusterInfo = facade.getSubCluster(subClusterId,
this.flushFacadeCacheForYarnRMAddr && isFailover);
// updating the conf with the refreshed RM addresses as proxy
// creations are based out of conf
updateRMAddress(subClusterInfo);