YARN-8010. Add config in FederationRMFailoverProxy to not bypass facade cache when failing over. (Botong Huang via Subru).

(cherry picked from commit 09999d7e01)
This commit is contained in:
Subru Krishnan 2018-03-28 11:33:19 -07:00
parent b36b438043
commit 304ce18714
4 changed files with 95 additions and 10 deletions

View File

@ -2806,15 +2806,18 @@ public class YarnConfiguration extends Configuration {
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs"; FEDERATION_PREFIX + "cache-ttl.secs";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR =
FEDERATION_PREFIX + "flush-cache-for-rm-addr";
public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true;
public static final String FEDERATION_REGISTRY_BASE_KEY = public static final String FEDERATION_REGISTRY_BASE_KEY =
FEDERATION_PREFIX + "registry.base-dir"; FEDERATION_PREFIX + "registry.base-dir";
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
"yarnfederation/"; "yarnfederation/";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";

View File

@ -79,6 +79,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED); .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
configurationPropsToSkipCompare configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS); .add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR);
configurationPropsToSkipCompare configurationPropsToSkipCompare
.add(YarnConfiguration.RM_EPOCH); .add(YarnConfiguration.RM_EPOCH);
configurationPropsToSkipCompare configurationPropsToSkipCompare

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProvider
import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider; import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
@ -49,7 +50,11 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.any; import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
/** /**
@ -61,12 +66,20 @@ public class TestFederationRMFailoverProxyProvider {
private FederationStateStore stateStore; private FederationStateStore stateStore;
private final String dummyCapability = "cap"; private final String dummyCapability = "cap";
private GetClusterMetricsResponse threadResponse;
@Before @Before
public void setUp() throws IOException, YarnException { public void setUp() throws IOException, YarnException {
conf = new YarnConfiguration(); conf = new YarnConfiguration();
stateStore = new MemoryFederationStateStore();
// Configure Facade cache to use a very long ttl
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 60 * 60);
stateStore = spy(new MemoryFederationStateStore());
stateStore.init(conf); stateStore.init(conf);
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf); FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
verify(stateStore, times(0))
.getSubClusters(any(GetSubClustersInfoRequest.class));
} }
@After @After
@ -75,12 +88,25 @@ public class TestFederationRMFailoverProxyProvider {
stateStore = null; stateStore = null;
} }
@Test @Test(timeout = 60000)
public void testFederationRMFailoverProxyProvider() throws Exception { public void testFederationRMFailoverProxyProvider() throws Exception {
testProxyProvider(true);
}
@Test (timeout=60000)
public void testFederationRMFailoverProxyProviderWithoutFlushFacadeCache()
throws Exception {
testProxyProvider(false);
}
private void testProxyProvider(boolean facadeFlushCache) throws Exception {
final SubClusterId subClusterId = SubClusterId.newInstance("SC-1"); final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
final MiniYARNCluster cluster = new MiniYARNCluster( final MiniYARNCluster cluster = new MiniYARNCluster(
"testFederationRMFailoverProxyProvider", 3, 0, 1, 1); "testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
conf.setBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
facadeFlushCache);
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
@ -100,14 +126,20 @@ public class TestFederationRMFailoverProxyProvider {
// Transition rm3 to active; // Transition rm3 to active;
makeRMActive(subClusterId, cluster, 2); makeRMActive(subClusterId, cluster, 2);
ApplicationClientProtocol client = FederationProxyProviderUtil final ApplicationClientProtocol client = FederationProxyProviderUtil
.createRMProxy(conf, ApplicationClientProtocol.class, subClusterId, .createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
UserGroupInformation.getCurrentUser()); UserGroupInformation.getCurrentUser());
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
// client will retry until the rm becomes active. // client will retry until the rm becomes active.
GetClusterMetricsResponse response = GetClusterMetricsResponse response =
client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
// validate response // validate response
checkResponse(response); checkResponse(response);
@ -118,7 +150,50 @@ public class TestFederationRMFailoverProxyProvider {
// Transition rm2 to active; // Transition rm2 to active;
makeRMActive(subClusterId, cluster, 1); makeRMActive(subClusterId, cluster, 1);
response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
threadResponse = null;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
// In non flush cache case, we will be hitting the cache with old RM
// address and keep failing before the cache is flushed
threadResponse =
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
} catch (YarnException | IOException e) {
e.printStackTrace();
}
}
});
thread.start();
if (!facadeFlushCache) {
// Add a wait so that hopefully the thread has started hitting old cached
Thread.sleep(500);
// Should still be hitting cache
verify(stateStore, times(1))
.getSubClusters(any(GetSubClustersInfoRequest.class));
// Force flush cache, so that it will pick up the new RM address
FederationStateStoreFacade.getInstance().getSubCluster(subClusterId,
true);
}
// Wait for the thread to finish and grab result
thread.join();
response = threadResponse;
if (facadeFlushCache) {
verify(stateStore, atLeast(2))
.getSubClusters(any(GetSubClustersInfoRequest.class));
} else {
verify(stateStore, times(2))
.getSubClusters(any(GetSubClustersInfoRequest.class));
}
// validate response // validate response
checkResponse(response); checkResponse(response);

View File

@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider<T>
private FederationStateStoreFacade facade; private FederationStateStoreFacade facade;
private SubClusterId subClusterId; private SubClusterId subClusterId;
private UserGroupInformation originalUser; private UserGroupInformation originalUser;
private boolean federationFailoverEnabled = false; private boolean federationFailoverEnabled;
private boolean flushFacadeCacheForYarnRMAddr;
@Override @Override
public void init(Configuration configuration, RMProxy<T> proxy, public void init(Configuration configuration, RMProxy<T> proxy,
@ -75,13 +76,16 @@ public class FederationRMFailoverProxyProvider<T>
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID); String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId"); Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
this.subClusterId = SubClusterId.newInstance(clusterId); this.subClusterId = SubClusterId.newInstance(clusterId);
this.facade = facade.getInstance(); this.facade = FederationStateStoreFacade.getInstance();
if (configuration instanceof YarnConfiguration) { if (configuration instanceof YarnConfiguration) {
this.conf = (YarnConfiguration) configuration; this.conf = (YarnConfiguration) configuration;
} }
federationFailoverEnabled = federationFailoverEnabled =
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
flushFacadeCacheForYarnRMAddr =
conf.getBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
YarnConfiguration.DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR);
conf.setInt( conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
@ -119,7 +123,8 @@ public class FederationRMFailoverProxyProvider<T>
try { try {
LOG.info("Failing over to the ResourceManager for SubClusterId: {}", LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
subClusterId); subClusterId);
subClusterInfo = facade.getSubCluster(subClusterId, isFailover); subClusterInfo = facade.getSubCluster(subClusterId,
this.flushFacadeCacheForYarnRMAddr && isFailover);
// updating the conf with the refreshed RM addresses as proxy // updating the conf with the refreshed RM addresses as proxy
// creations are based out of conf // creations are based out of conf
updateRMAddress(subClusterInfo); updateRMAddress(subClusterInfo);