diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index b76f45798e9..5a2c1f91dd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3087,15 +3087,18 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; + // 5 minutes + public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + + public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = + FEDERATION_PREFIX + "flush-cache-for-rm-addr"; + public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true; public static final String FEDERATION_REGISTRY_BASE_KEY = FEDERATION_PREFIX + "registry.base-dir"; public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = "yarnfederation/"; - // 5 minutes - public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; - public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 9fe4f884899..f4d1ac0a255 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -79,6 +79,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED); configurationPropsToSkipCompare .add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR); configurationPropsToSkipCompare .add(YarnConfiguration.RM_EPOCH); configurationPropsToSkipCompare diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java index e3f91557ee4..0a7ee3ff734 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProvider import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; @@ -49,7 +50,11 @@ import org.junit.Before; import org.junit.Test; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -61,12 +66,20 @@ public class TestFederationRMFailoverProxyProvider { private FederationStateStore stateStore; private final String dummyCapability = "cap"; + private GetClusterMetricsResponse threadResponse; + @Before public void setUp() throws IOException, YarnException { conf = new YarnConfiguration(); - stateStore = new MemoryFederationStateStore(); + + // Configure Facade cache to use a very long ttl + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 60 * 60); + + stateStore = spy(new MemoryFederationStateStore()); stateStore.init(conf); FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf); + verify(stateStore, times(0)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); } @After @@ -75,12 +88,25 @@ public class TestFederationRMFailoverProxyProvider { stateStore = null; } - @Test + @Test(timeout = 60000) public void testFederationRMFailoverProxyProvider() throws Exception { + testProxyProvider(true); + } + + @Test (timeout=60000) + public void testFederationRMFailoverProxyProviderWithoutFlushFacadeCache() + throws Exception { + testProxyProvider(false); + } + + private void testProxyProvider(boolean facadeFlushCache) throws Exception { final SubClusterId subClusterId = SubClusterId.newInstance("SC-1"); final MiniYARNCluster cluster = new MiniYARNCluster( "testFederationRMFailoverProxyProvider", 3, 0, 1, 1); + conf.setBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR, + facadeFlushCache); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); @@ -104,10 +130,16 @@ public class TestFederationRMFailoverProxyProvider { .createRMProxy(conf, ApplicationClientProtocol.class, subClusterId, UserGroupInformation.getCurrentUser()); + verify(stateStore, times(1)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + // client will retry until the rm becomes active. GetClusterMetricsResponse response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + verify(stateStore, times(1)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + // validate response checkResponse(response); @@ -118,7 +150,50 @@ public class TestFederationRMFailoverProxyProvider { // Transition rm2 to active; makeRMActive(subClusterId, cluster, 1); - response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + + verify(stateStore, times(1)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + + threadResponse = null; + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + // In non flush cache case, we will be hitting the cache with old RM + // address and keep failing before the cache is flushed + threadResponse = + client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + } catch (YarnException | IOException e) { + e.printStackTrace(); + } + } + }); + thread.start(); + + if (!facadeFlushCache) { + // Add a wait so that hopefully the thread has started hitting old cached + Thread.sleep(500); + + // Should still be hitting cache + verify(stateStore, times(1)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + + // Force flush cache, so that it will pick up the new RM address + FederationStateStoreFacade.getInstance().getSubCluster(subClusterId, + true); + } + + // Wait for the thread to finish and grab result + thread.join(); + response = threadResponse; + + if (facadeFlushCache) { + verify(stateStore, atLeast(2)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + } else { + verify(stateStore, times(2)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + } // validate response checkResponse(response); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java index c631208b783..cf6d1ef5bd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java @@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider private FederationStateStoreFacade facade; private SubClusterId subClusterId; private UserGroupInformation originalUser; - private boolean federationFailoverEnabled = false; + private boolean federationFailoverEnabled; + private boolean flushFacadeCacheForYarnRMAddr; @Override public void init(Configuration configuration, RMProxy proxy, @@ -75,13 +76,16 @@ public class FederationRMFailoverProxyProvider String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID); Preconditions.checkNotNull(clusterId, "Missing RM ClusterId"); this.subClusterId = SubClusterId.newInstance(clusterId); - this.facade = facade.getInstance(); + this.facade = FederationStateStoreFacade.getInstance(); if (configuration instanceof YarnConfiguration) { this.conf = (YarnConfiguration) configuration; } federationFailoverEnabled = conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); + flushFacadeCacheForYarnRMAddr = + conf.getBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR, + YarnConfiguration.DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR); conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, @@ -119,7 +123,8 @@ public class FederationRMFailoverProxyProvider try { LOG.info("Failing over to the ResourceManager for SubClusterId: {}", subClusterId); - subClusterInfo = facade.getSubCluster(subClusterId, isFailover); + subClusterInfo = facade.getSubCluster(subClusterId, + this.flushFacadeCacheForYarnRMAddr && isFailover); // updating the conf with the refreshed RM addresses as proxy // creations are based out of conf updateRMAddress(subClusterInfo);