diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
index 133b377c0b5..528b6421a3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.conf;
-import com.google.common.annotations.VisibleForTesting;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -27,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
+import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class HAUtil {
@@ -44,6 +45,29 @@ public class HAUtil {
throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
}
+ /**
+ * Returns true if Federation is configured.
+ *
+ * @param conf Configuration
+ * @return true if federation is configured in the configuration; else false.
+ */
+ public static boolean isFederationEnabled(Configuration conf) {
+ return conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
+ YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
+ }
+
+ /**
+ * Returns true if RM failover is enabled in a Federation setting.
+ *
+ * @param conf Configuration
+ * @return if RM failover is enabled in conjunction with Federation in the
+ * configuration; else false.
+ */
+ public static boolean isFederationFailoverEnabled(Configuration conf) {
+ return conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
+ YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
+ }
+
/**
* Returns true if Resource Manager HA is configured.
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fe6c7b8ad2a..612d89b608b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2561,6 +2561,16 @@ public class YarnConfiguration extends Configuration {
public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
+ public static final String FEDERATION_ENABLED = FEDERATION_PREFIX + "enabled";
+ public static final boolean DEFAULT_FEDERATION_ENABLED = false;
+
+ public static final String FEDERATION_FAILOVER_ENABLED =
+ FEDERATION_PREFIX + "failover.enabled";
+ public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
+
+ public static final String FEDERATION_SUBCLUSTER_ID =
+ FEDERATION_PREFIX + "sub-cluster.id";
+
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
FEDERATION_PREFIX + "state-store.class";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index bfc253451ca..c4d8f383513 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -71,6 +71,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
// Federation default configs to be ignored
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
// Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress"
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
new file mode 100644
index 00000000000..fa3523c9f70
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for FederationRMFailoverProxyProvider.
+ */
+public class TestFederationRMFailoverProxyProvider {
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private final String dummyCapability = "cap";
+
+ @Before
+ public void setUp() throws IOException, YarnException {
+ conf = new YarnConfiguration();
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+ FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stateStore.close();
+ stateStore = null;
+ }
+
+ @Test
+ public void testFederationRMFailoverProxyProvider() throws Exception {
+ final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
+ final MiniYARNCluster cluster = new MiniYARNCluster(
+ "testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
+
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+ conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3");
+
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
+ 2000);
+
+ HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
+ HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
+ HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+
+ cluster.init(conf);
+ cluster.start();
+
+ // Transition rm3 to active;
+ makeRMActive(subClusterId, cluster, 2);
+
+ ApplicationClientProtocol client = FederationProxyProviderUtil
+ .createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
+ UserGroupInformation.getCurrentUser());
+
+ // client will retry until the rm becomes active.
+ GetClusterMetricsResponse response =
+ client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
+
+ // validate response
+ checkResponse(response);
+
+ // transition rm3 to standby
+ cluster.getResourceManager(2).getRMContext().getRMAdminService()
+ .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER));
+
+ // Transition rm2 to active;
+ makeRMActive(subClusterId, cluster, 1);
+ response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
+
+ // validate response
+ checkResponse(response);
+
+ cluster.stop();
+ }
+
+ private void checkResponse(GetClusterMetricsResponse response) {
+ Assert.assertNotNull(response.getClusterMetrics());
+ Assert.assertEquals(0,
+ response.getClusterMetrics().getNumActiveNodeManagers());
+ }
+
+ private void makeRMActive(final SubClusterId subClusterId,
+ final MiniYARNCluster cluster, final int index) {
+ try {
+ System.out.println("Transition rm" + (index + 1) + " to active");
+ String dummyAddress = "host:" + index;
+ cluster.getResourceManager(index).getRMContext().getRMAdminService()
+ .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER));
+ ResourceManager rm = cluster.getResourceManager(index);
+ InetSocketAddress amRMAddress =
+ rm.getApplicationMasterService().getBindAddress();
+ InetSocketAddress clientRMAddress =
+ rm.getClientRMService().getBindAddress();
+ SubClusterRegisterRequest request = SubClusterRegisterRequest
+ .newInstance(SubClusterInfo.newInstance(subClusterId,
+ amRMAddress.getAddress().getHostAddress() + ":"
+ + amRMAddress.getPort(),
+ clientRMAddress.getAddress().getHostAddress() + ":"
+ + clientRMAddress.getPort(),
+ dummyAddress, dummyAddress, SubClusterState.SC_NEW, 1,
+ dummyCapability));
+ stateStore.registerSubCluster(request);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
index 0232debb352..5b028e1925f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
@@ -84,7 +84,7 @@ public class ClientRMProxy extends RMProxy {
@Private
@Override
- protected InetSocketAddress getRMAddress(YarnConfiguration conf,
+ public InetSocketAddress getRMAddress(YarnConfiguration conf,
Class> protocol) throws IOException {
if (protocol == ApplicationClientProtocol.class) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
@@ -111,7 +111,7 @@ public class ClientRMProxy extends RMProxy {
@Private
@Override
- protected void checkAllowedProtocols(Class> protocol) {
+ public void checkAllowedProtocols(Class> protocol) {
Preconditions.checkArgument(
protocol.isAssignableFrom(ClientRMProtocols.class),
"RM does not support this client protocol");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
index 8aa4107a98b..f7cb47a9dc8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
@@ -71,14 +71,14 @@ public class RMProxy {
* Verify the passed protocol is supported.
*/
@Private
- protected void checkAllowedProtocols(Class> protocol) {}
+ public void checkAllowedProtocols(Class> protocol) {}
/**
* Get the ResourceManager address from the provided Configuration for the
* given protocol.
*/
@Private
- protected InetSocketAddress getRMAddress(
+ public InetSocketAddress getRMAddress(
YarnConfiguration conf, Class> protocol) throws IOException {
throw new UnsupportedOperationException("This method should be invoked " +
"from an instance of ClientRMProxy or ServerRMProxy");
@@ -97,7 +97,8 @@ public class RMProxy {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration
: new YarnConfiguration(configuration);
- RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
+ RetryPolicy retryPolicy = createRetryPolicy(conf,
+ (HAUtil.isHAEnabled(conf) || HAUtil.isFederationFailoverEnabled(conf)));
return newProxyInstance(conf, protocol, instance, retryPolicy);
}
@@ -123,7 +124,7 @@ public class RMProxy {
private static T newProxyInstance(final YarnConfiguration conf,
final Class protocol, RMProxy instance, RetryPolicy retryPolicy)
throws IOException{
- if (HAUtil.isHAEnabled(conf)) {
+ if (HAUtil.isHAEnabled(conf) || HAUtil.isFederationEnabled(conf)) {
RMFailoverProxyProvider provider =
instance.createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy);
@@ -140,7 +141,7 @@ public class RMProxy {
* RetryProxy.
*/
@Private
- T getProxy(final Configuration conf,
+ public T getProxy(final Configuration conf,
final Class protocol, final InetSocketAddress rmAddress)
throws IOException {
return user.doAs(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0b0a1607efa..96d73bcd03b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2688,6 +2688,13 @@
+
+
+ Flag to indicate whether the RM is participating in Federation or not.
+
+ yarn.federation.enabled
+ false
+
Machine list file to be loaded by the FederationSubCluster Resolver
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index def53576be7..6cf41e7bcea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -105,12 +105,10 @@
javax.cache
cache-api
- ${jcache.version}
org.ehcache
ehcache
- ${ehcache.version}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
index 3012be382cc..edec89f3025 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
@@ -71,7 +71,7 @@ public class ServerRMProxy extends RMProxy {
@InterfaceAudience.Private
@Override
- protected InetSocketAddress getRMAddress(YarnConfiguration conf,
+ public InetSocketAddress getRMAddress(YarnConfiguration conf,
Class> protocol) {
if (protocol == ResourceTracker.class) {
return conf.getSocketAddr(
@@ -93,7 +93,7 @@ public class ServerRMProxy extends RMProxy {
@InterfaceAudience.Private
@Override
- protected void checkAllowedProtocols(Class> protocol) {
+ public void checkAllowedProtocols(Class> protocol) {
Preconditions.checkArgument(
protocol.isAssignableFrom(ResourceTracker.class),
"ResourceManager does not support this protocol");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
new file mode 100644
index 00000000000..a9860083ece
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.failover;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that creates proxy for specified protocols when federation is
+ * enabled. The class creates a federation aware failover provider, i.e. the
+ * failover provider uses the {@code FederationStateStore} to determine the
+ * current active ResourceManager
+ */
+@Private
+@Unstable
+public final class FederationProxyProviderUtil {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FederationProxyProviderUtil.class);
+
+ /**
+ * Create a proxy for the specified protocol. For non-HA, this is a direct
+ * connection to the ResourceManager address. When HA is enabled, the proxy
+ * handles the failover between the ResourceManagers as well.
+ *
+ * @param configuration Configuration to generate {@link ClientRMProxy}
+ * @param protocol Protocol for the proxy
+ * @param subClusterId the unique identifier or the sub-cluster
+ * @param user the user on whose behalf the proxy is being created
+ * @param Type information of the proxy
+ * @return Proxy to the RM
+ * @throws IOException on failure
+ */
+ @Public
+ @Unstable
+ public static T createRMProxy(Configuration configuration,
+ final Class protocol, SubClusterId subClusterId,
+ UserGroupInformation user) throws IOException {
+ return createRMProxy(configuration, protocol, subClusterId, user, null);
+ }
+
+ /**
+ * Create a proxy for the specified protocol. For non-HA, this is a direct
+ * connection to the ResourceManager address. When HA is enabled, the proxy
+ * handles the failover between the ResourceManagers as well.
+ *
+ * @param configuration Configuration to generate {@link ClientRMProxy}
+ * @param protocol Protocol for the proxy
+ * @param subClusterId the unique identifier or the sub-cluster
+ * @param user the user on whose behalf the proxy is being created
+ * @param token the auth token to use for connection
+ * @param Type information of the proxy
+ * @return Proxy to the RM
+ * @throws IOException on failure
+ */
+ @Public
+ @Unstable
+ @SuppressWarnings("unchecked")
+ public static T createRMProxy(final Configuration configuration,
+ final Class protocol, SubClusterId subClusterId,
+ UserGroupInformation user, final Token token) throws IOException {
+ try {
+ final YarnConfiguration conf = new YarnConfiguration(configuration);
+ updateConf(conf, subClusterId);
+ if (token != null) {
+ LOG.info(
+ "Creating RMProxy with a token: {} to subcluster: {}"
+ + " for protocol: {}",
+ token, subClusterId, protocol.getSimpleName());
+ user.addToken(token);
+ setAuthModeInConf(conf);
+ } else {
+ LOG.info("Creating RMProxy without a token to subcluster: {}"
+ + " for protocol: {}", subClusterId, protocol.getSimpleName());
+ }
+ final T proxyConnection = user.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public T run() throws Exception {
+ return ClientRMProxy.createRMProxy(conf, protocol);
+ }
+ });
+
+ return proxyConnection;
+ } catch (IOException e) {
+ String message =
+ "Error while creating of RM application master service proxy for"
+ + " appAttemptId: " + user;
+ LOG.info(message);
+ throw new YarnRuntimeException(message, e);
+ } catch (InterruptedException e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ private static void setAuthModeInConf(Configuration conf) {
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ SaslRpcServer.AuthMethod.TOKEN.toString());
+ }
+
+ // updating the conf with the refreshed RM addresses as proxy creations
+ // are based out of conf
+ private static void updateConf(Configuration conf,
+ SubClusterId subClusterId) {
+ conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId());
+ // In a Federation setting, we will connect to not just the local cluster RM
+ // but also multiple external RMs. The membership information of all the RMs
+ // that are currently
+ // participating in Federation is available in the central
+ // FederationStateStore.
+ // So we will:
+ // 1. obtain the RM service addresses from FederationStateStore using the
+ // FederationRMFailoverProxyProvider.
+ // 2. disable traditional HA as that depends on local configuration lookup
+ // for RMs using indexes.
+ // 3. we will enable federation failover IF traditional HA is enabled so
+ // that the appropriate failover RetryPolicy is initialized.
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
+ FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
+ if (HAUtil.isHAEnabled(conf)) {
+ conf.setBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false);
+ }
+ }
+
+ // disable instantiation
+ private FederationProxyProviderUtil() {
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
new file mode 100644
index 00000000000..c70362c75fe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.failover;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
+import org.apache.hadoop.yarn.client.RMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A FailoverProxyProvider implementation that uses the
+ * {@code FederationStateStore} to determine the ResourceManager to connect to.
+ * This supports both HA and regular mode which is controlled by configuration.
+ */
+@Private
+@Unstable
+public class FederationRMFailoverProxyProvider
+ implements RMFailoverProxyProvider {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationRMFailoverProxyProvider.class);
+
+ private RMProxy rmProxy;
+ private Class protocol;
+ private T current;
+ private YarnConfiguration conf;
+ private FederationStateStoreFacade facade;
+ private SubClusterId subClusterId;
+ private Collection> originalTokens;
+ private boolean federationFailoverEnabled = false;
+
+ @Override
+ public void init(Configuration configuration, RMProxy proxy,
+ Class proto) {
+ this.rmProxy = proxy;
+ this.protocol = proto;
+ this.rmProxy.checkAllowedProtocols(this.protocol);
+ String clusterId =
+ configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
+ Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId");
+ this.subClusterId = SubClusterId.newInstance(clusterId);
+ this.facade = facade.getInstance();
+ if (configuration instanceof YarnConfiguration) {
+ this.conf = (YarnConfiguration) configuration;
+ }
+ federationFailoverEnabled =
+ conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
+ YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
+
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+ conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
+ YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
+
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+ conf.getInt(
+ YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
+ YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
+
+ try {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ originalTokens = currentUser.getTokens();
+ LOG.info("Initialized Federation proxy for user: {}",
+ currentUser.getUserName());
+ } catch (IOException e) {
+ LOG.warn("Could not get information of requester, ignoring for now.");
+ }
+
+ }
+
+ private void addOriginalTokens(UserGroupInformation currentUser) {
+ if (originalTokens == null || originalTokens.isEmpty()) {
+ return;
+ }
+ for (Token extends TokenIdentifier> token : originalTokens) {
+ currentUser.addToken(token);
+ }
+ }
+
+ private T getProxyInternal(boolean isFailover) {
+ SubClusterInfo subClusterInfo;
+ UserGroupInformation currentUser = null;
+ try {
+ LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
+ subClusterId);
+ subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
+ // updating the conf with the refreshed RM addresses as proxy
+ // creations
+ // are based out of conf
+ updateRMAddress(subClusterInfo);
+ currentUser = UserGroupInformation.getCurrentUser();
+ addOriginalTokens(currentUser);
+ } catch (YarnException e) {
+ LOG.error("Exception while trying to create proxy to the ResourceManager"
+ + " for SubClusterId: {}", subClusterId, e);
+ return null;
+ } catch (IOException e) {
+ LOG.warn("Could not get information of requester, ignoring for now.");
+ }
+ try {
+ final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+ LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
+ protocol.getSimpleName(), currentUser);
+ LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
+ subClusterId);
+ return rmProxy.getProxy(conf, protocol, rmAddress);
+ } catch (IOException ioe) {
+ LOG.error(
+ "IOException while trying to create proxy to the ResourceManager"
+ + " for SubClusterId: {}",
+ subClusterId, ioe);
+ return null;
+ }
+ }
+
+ private void updateRMAddress(SubClusterInfo subClusterInfo) {
+ if (subClusterInfo != null) {
+ if (protocol == ApplicationClientProtocol.class) {
+ conf.set(YarnConfiguration.RM_ADDRESS,
+ subClusterInfo.getClientRMServiceAddress());
+ } else if (protocol == ApplicationMasterProtocol.class) {
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ subClusterInfo.getAMRMServiceAddress());
+ } else if (protocol == ResourceManagerAdministrationProtocol.class) {
+ conf.set(YarnConfiguration.RM_ADMIN_ADDRESS,
+ subClusterInfo.getRMAdminServiceAddress());
+ }
+ }
+ }
+
+ @Override
+ public synchronized ProxyInfo getProxy() {
+ if (current == null) {
+ current = getProxyInternal(false);
+ }
+ return new ProxyInfo(current, subClusterId.getId());
+ }
+
+ @Override
+ public synchronized void performFailover(T currentProxy) {
+ closeInternal(currentProxy);
+ current = getProxyInternal(federationFailoverEnabled);
+ }
+
+ @Override
+ public Class getInterface() {
+ return protocol;
+ }
+
+ private void closeInternal(T currentProxy) {
+ if ((currentProxy != null) && (currentProxy instanceof Closeable)) {
+ try {
+ ((Closeable) currentProxy).close();
+ } catch (IOException e) {
+ LOG.warn("Exception while trying to close proxy", e);
+ }
+ } else {
+ RPC.stopProxy(currentProxy);
+ }
+
+ }
+
+ /**
+ * Close all the proxy objects which have been opened over the lifetime of
+ * this proxy provider.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ closeInternal(current);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java
new file mode 100644
index 00000000000..b1baa0c251a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.failover;