YARN-8010. Add config in FederationRMFailoverProxy to not bypass facade cache when failing over. (Botong Huang via Subru).

(cherry picked from commit 09999d7e01)
This commit is contained in:
Subru Krishnan 2018-03-28 11:33:19 -07:00
parent ed49f59e18
commit a0091ec4b3
4 changed files with 94 additions and 9 deletions

View File

@ -3087,15 +3087,18 @@ public class YarnConfiguration extends Configuration {
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR =
FEDERATION_PREFIX + "flush-cache-for-rm-addr";
public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true;
public static final String FEDERATION_REGISTRY_BASE_KEY =
FEDERATION_PREFIX + "registry.base-dir";
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
"yarnfederation/";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";

View File

@ -79,6 +79,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR);
configurationPropsToSkipCompare
.add(YarnConfiguration.RM_EPOCH);
configurationPropsToSkipCompare

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProvider
import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
@ -49,7 +50,11 @@ import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@ -61,12 +66,20 @@ public class TestFederationRMFailoverProxyProvider {
private FederationStateStore stateStore;
private final String dummyCapability = "cap";
private GetClusterMetricsResponse threadResponse;
@Before
public void setUp() throws IOException, YarnException {
conf = new YarnConfiguration();
stateStore = new MemoryFederationStateStore();
// Configure Facade cache to use a very long ttl
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 60 * 60);
stateStore = spy(new MemoryFederationStateStore());
stateStore.init(conf);
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
verify(stateStore, times(0))
.getSubClusters(any(GetSubClustersInfoRequest.class));
}
@After
@ -75,12 +88,25 @@ public class TestFederationRMFailoverProxyProvider {
stateStore = null;
}
@Test
@Test(timeout = 60000)
public void testFederationRMFailoverProxyProvider() throws Exception {
testProxyProvider(true);
}
@Test (timeout=60000)
public void testFederationRMFailoverProxyProviderWithoutFlushFacadeCache()
throws Exception {
testProxyProvider(false);
}
private void testProxyProvider(boolean facadeFlushCache) throws Exception {
final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
final MiniYARNCluster cluster = new MiniYARNCluster(
"testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
conf.setBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
facadeFlushCache);
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
@ -104,10 +130,16 @@ public class TestFederationRMFailoverProxyProvider {
.createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
UserGroupInformation.getCurrentUser());
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
// client will retry until the rm becomes active.
GetClusterMetricsResponse response =
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
// validate response
checkResponse(response);
@ -118,7 +150,50 @@ public class TestFederationRMFailoverProxyProvider {
// Transition rm2 to active;
makeRMActive(subClusterId, cluster, 1);
response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
threadResponse = null;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
// In non flush cache case, we will be hitting the cache with old RM
// address and keep failing before the cache is flushed
threadResponse =
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
} catch (YarnException | IOException e) {
e.printStackTrace();
}
}
});
thread.start();
if (!facadeFlushCache) {
// Add a wait so that hopefully the thread has started hitting old cached
Thread.sleep(500);
// Should still be hitting cache
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
// Force flush cache, so that it will pick up the new RM address
FederationStateStoreFacade.getInstance().getSubCluster(subClusterId,
true);
}
// Wait for the thread to finish and grab result
thread.join();
response = threadResponse;
if (facadeFlushCache) {
verify(stateStore, atLeast(2))
.getSubClusters(any(GetSubClustersInfoRequest.class));
} else {
verify(stateStore, times(2))
.getSubClusters(any(GetSubClustersInfoRequest.class));
}
// validate response
checkResponse(response);

View File

@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider<T>
private FederationStateStoreFacade facade;
private SubClusterId subClusterId;
private UserGroupInformation originalUser;
private boolean federationFailoverEnabled = false;
private boolean federationFailoverEnabled;
private boolean flushFacadeCacheForYarnRMAddr;
@Override
public void init(Configuration configuration, RMProxy<T> proxy,
@ -75,13 +76,16 @@ public class FederationRMFailoverProxyProvider<T>
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
this.subClusterId = SubClusterId.newInstance(clusterId);
this.facade = facade.getInstance();
this.facade = FederationStateStoreFacade.getInstance();
if (configuration instanceof YarnConfiguration) {
this.conf = (YarnConfiguration) configuration;
}
federationFailoverEnabled =
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
flushFacadeCacheForYarnRMAddr =
conf.getBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
YarnConfiguration.DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
@ -119,7 +123,8 @@ public class FederationRMFailoverProxyProvider<T>
try {
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
subClusterId);
subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
subClusterInfo = facade.getSubCluster(subClusterId,
this.flushFacadeCacheForYarnRMAddr && isFailover);
// updating the conf with the refreshed RM addresses as proxy
// creations are based out of conf
updateRMAddress(subClusterInfo);