From 3307564a5f8c8abc5fe84efcd05ee0f7dfdd921c Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 22 Aug 2016 14:43:07 +0800 Subject: [PATCH] YARN-3673. Create a FailoverProxy for Federation services. Contributed by Subru Krishnan --- .../org/apache/hadoop/yarn/conf/HAUtil.java | 30 ++- .../hadoop/yarn/conf/YarnConfiguration.java | 10 + .../conf/TestYarnConfigurationFields.java | 4 + ...TestFederationRMFailoverProxyProvider.java | 154 +++++++++++++ .../hadoop/yarn/client/ClientRMProxy.java | 4 +- .../apache/hadoop/yarn/client/RMProxy.java | 11 +- .../src/main/resources/yarn-default.xml | 7 + .../hadoop-yarn-server-common/pom.xml | 2 - .../hadoop/yarn/server/api/ServerRMProxy.java | 4 +- .../failover/FederationProxyProviderUtil.java | 163 ++++++++++++++ .../FederationRMFailoverProxyProvider.java | 211 ++++++++++++++++++ .../federation/failover/package-info.java | 17 ++ 12 files changed, 603 insertions(+), 14 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index 133b377c0b5..528b6421a3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.conf; -import com.google.common.annotations.VisibleForTesting; +import java.net.InetSocketAddress; +import java.util.Collection; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -27,8 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import java.net.InetSocketAddress; -import java.util.Collection; +import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public class HAUtil { @@ -44,6 +45,29 @@ public class HAUtil { throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg); } + /** + * Returns true if Federation is configured. + * + * @param conf Configuration + * @return true if federation is configured in the configuration; else false. + */ + public static boolean isFederationEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED, + YarnConfiguration.DEFAULT_FEDERATION_ENABLED); + } + + /** + * Returns true if RM failover is enabled in a Federation setting. + * + * @param conf Configuration + * @return if RM failover is enabled in conjunction with Federation in the + * configuration; else false. + */ + public static boolean isFederationFailoverEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, + YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); + } + /** * Returns true if Resource Manager HA is configured. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fe6c7b8ad2a..612d89b608b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2561,6 +2561,16 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation."; + public static final String FEDERATION_ENABLED = FEDERATION_PREFIX + "enabled"; + public static final boolean DEFAULT_FEDERATION_ENABLED = false; + + public static final String FEDERATION_FAILOVER_ENABLED = + FEDERATION_PREFIX + "failover.enabled"; + public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true; + + public static final String FEDERATION_SUBCLUSTER_ID = + FEDERATION_PREFIX + "sub-cluster.id"; + public static final String FEDERATION_STATESTORE_CLIENT_CLASS = FEDERATION_PREFIX + "state-store.class"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index bfc253451ca..c4d8f383513 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -71,6 +71,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { // Federation default configs to be ignored configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED); // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java new file mode 100644 index 00000000000..fa3523c9f70 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for FederationRMFailoverProxyProvider. + */ +public class TestFederationRMFailoverProxyProvider { + + private Configuration conf; + private FederationStateStore stateStore; + private final String dummyCapability = "cap"; + + @Before + public void setUp() throws IOException, YarnException { + conf = new YarnConfiguration(); + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf); + } + + @After + public void tearDown() throws Exception { + stateStore.close(); + stateStore = null; + } + + @Test + public void testFederationRMFailoverProxyProvider() throws Exception { + final SubClusterId subClusterId = SubClusterId.newInstance("SC-1"); + final MiniYARNCluster cluster = new MiniYARNCluster( + "testFederationRMFailoverProxyProvider", 3, 0, 1, 1); + + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3"); + + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + 2000); + + HATestUtil.setRpcAddressForRM("rm1", 10000, conf); + HATestUtil.setRpcAddressForRM("rm2", 20000, conf); + HATestUtil.setRpcAddressForRM("rm3", 30000, conf); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + + cluster.init(conf); + cluster.start(); + + // Transition rm3 to active; + makeRMActive(subClusterId, cluster, 2); + + ApplicationClientProtocol client = FederationProxyProviderUtil + .createRMProxy(conf, ApplicationClientProtocol.class, subClusterId, + UserGroupInformation.getCurrentUser()); + + // client will retry until the rm becomes active. + GetClusterMetricsResponse response = + client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + + // validate response + checkResponse(response); + + // transition rm3 to standby + cluster.getResourceManager(2).getRMContext().getRMAdminService() + .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER)); + + // Transition rm2 to active; + makeRMActive(subClusterId, cluster, 1); + response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + + // validate response + checkResponse(response); + + cluster.stop(); + } + + private void checkResponse(GetClusterMetricsResponse response) { + Assert.assertNotNull(response.getClusterMetrics()); + Assert.assertEquals(0, + response.getClusterMetrics().getNumActiveNodeManagers()); + } + + private void makeRMActive(final SubClusterId subClusterId, + final MiniYARNCluster cluster, final int index) { + try { + System.out.println("Transition rm" + (index + 1) + " to active"); + String dummyAddress = "host:" + index; + cluster.getResourceManager(index).getRMContext().getRMAdminService() + .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER)); + ResourceManager rm = cluster.getResourceManager(index); + InetSocketAddress amRMAddress = + rm.getApplicationMasterService().getBindAddress(); + InetSocketAddress clientRMAddress = + rm.getClientRMService().getBindAddress(); + SubClusterRegisterRequest request = SubClusterRegisterRequest + .newInstance(SubClusterInfo.newInstance(subClusterId, + amRMAddress.getAddress().getHostAddress() + ":" + + amRMAddress.getPort(), + clientRMAddress.getAddress().getHostAddress() + ":" + + clientRMAddress.getPort(), + dummyAddress, dummyAddress, SubClusterState.SC_NEW, 1, + dummyCapability)); + stateStore.registerSubCluster(request); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index 0232debb352..5b028e1925f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -84,7 +84,7 @@ public class ClientRMProxy extends RMProxy { @Private @Override - protected InetSocketAddress getRMAddress(YarnConfiguration conf, + public InetSocketAddress getRMAddress(YarnConfiguration conf, Class protocol) throws IOException { if (protocol == ApplicationClientProtocol.class) { return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, @@ -111,7 +111,7 @@ public class ClientRMProxy extends RMProxy { @Private @Override - protected void checkAllowedProtocols(Class protocol) { + public void checkAllowedProtocols(Class protocol) { Preconditions.checkArgument( protocol.isAssignableFrom(ClientRMProtocols.class), "RM does not support this client protocol"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index 8aa4107a98b..f7cb47a9dc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -71,14 +71,14 @@ public class RMProxy { * Verify the passed protocol is supported. */ @Private - protected void checkAllowedProtocols(Class protocol) {} + public void checkAllowedProtocols(Class protocol) {} /** * Get the ResourceManager address from the provided Configuration for the * given protocol. */ @Private - protected InetSocketAddress getRMAddress( + public InetSocketAddress getRMAddress( YarnConfiguration conf, Class protocol) throws IOException { throw new UnsupportedOperationException("This method should be invoked " + "from an instance of ClientRMProxy or ServerRMProxy"); @@ -97,7 +97,8 @@ public class RMProxy { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); - RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf)); + RetryPolicy retryPolicy = createRetryPolicy(conf, + (HAUtil.isHAEnabled(conf) || HAUtil.isFederationFailoverEnabled(conf))); return newProxyInstance(conf, protocol, instance, retryPolicy); } @@ -123,7 +124,7 @@ public class RMProxy { private static T newProxyInstance(final YarnConfiguration conf, final Class protocol, RMProxy instance, RetryPolicy retryPolicy) throws IOException{ - if (HAUtil.isHAEnabled(conf)) { + if (HAUtil.isHAEnabled(conf) || HAUtil.isFederationEnabled(conf)) { RMFailoverProxyProvider provider = instance.createRMFailoverProxyProvider(conf, protocol); return (T) RetryProxy.create(protocol, provider, retryPolicy); @@ -140,7 +141,7 @@ public class RMProxy { * RetryProxy. */ @Private - T getProxy(final Configuration conf, + public T getProxy(final Configuration conf, final Class protocol, final InetSocketAddress rmAddress) throws IOException { return user.doAs( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0b0a1607efa..96d73bcd03b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2688,6 +2688,13 @@ + + + Flag to indicate whether the RM is participating in Federation or not. + + yarn.federation.enabled + false + Machine list file to be loaded by the FederationSubCluster Resolver diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index def53576be7..6cf41e7bcea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -105,12 +105,10 @@ javax.cache cache-api - ${jcache.version} org.ehcache ehcache - ${ehcache.version} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java index 3012be382cc..edec89f3025 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -71,7 +71,7 @@ public class ServerRMProxy extends RMProxy { @InterfaceAudience.Private @Override - protected InetSocketAddress getRMAddress(YarnConfiguration conf, + public InetSocketAddress getRMAddress(YarnConfiguration conf, Class protocol) { if (protocol == ResourceTracker.class) { return conf.getSocketAddr( @@ -93,7 +93,7 @@ public class ServerRMProxy extends RMProxy { @InterfaceAudience.Private @Override - protected void checkAllowedProtocols(Class protocol) { + public void checkAllowedProtocols(Class protocol) { Preconditions.checkArgument( protocol.isAssignableFrom(ResourceTracker.class), "ResourceManager does not support this protocol"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java new file mode 100644 index 00000000000..a9860083ece --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.failover; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.RMFailoverProxyProvider; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that creates proxy for specified protocols when federation is + * enabled. The class creates a federation aware failover provider, i.e. the + * failover provider uses the {@code FederationStateStore} to determine the + * current active ResourceManager + */ +@Private +@Unstable +public final class FederationProxyProviderUtil { + + public static final Logger LOG = + LoggerFactory.getLogger(FederationProxyProviderUtil.class); + + /** + * Create a proxy for the specified protocol. For non-HA, this is a direct + * connection to the ResourceManager address. When HA is enabled, the proxy + * handles the failover between the ResourceManagers as well. + * + * @param configuration Configuration to generate {@link ClientRMProxy} + * @param protocol Protocol for the proxy + * @param subClusterId the unique identifier or the sub-cluster + * @param user the user on whose behalf the proxy is being created + * @param Type information of the proxy + * @return Proxy to the RM + * @throws IOException on failure + */ + @Public + @Unstable + public static T createRMProxy(Configuration configuration, + final Class protocol, SubClusterId subClusterId, + UserGroupInformation user) throws IOException { + return createRMProxy(configuration, protocol, subClusterId, user, null); + } + + /** + * Create a proxy for the specified protocol. For non-HA, this is a direct + * connection to the ResourceManager address. When HA is enabled, the proxy + * handles the failover between the ResourceManagers as well. + * + * @param configuration Configuration to generate {@link ClientRMProxy} + * @param protocol Protocol for the proxy + * @param subClusterId the unique identifier or the sub-cluster + * @param user the user on whose behalf the proxy is being created + * @param token the auth token to use for connection + * @param Type information of the proxy + * @return Proxy to the RM + * @throws IOException on failure + */ + @Public + @Unstable + @SuppressWarnings("unchecked") + public static T createRMProxy(final Configuration configuration, + final Class protocol, SubClusterId subClusterId, + UserGroupInformation user, final Token token) throws IOException { + try { + final YarnConfiguration conf = new YarnConfiguration(configuration); + updateConf(conf, subClusterId); + if (token != null) { + LOG.info( + "Creating RMProxy with a token: {} to subcluster: {}" + + " for protocol: {}", + token, subClusterId, protocol.getSimpleName()); + user.addToken(token); + setAuthModeInConf(conf); + } else { + LOG.info("Creating RMProxy without a token to subcluster: {}" + + " for protocol: {}", subClusterId, protocol.getSimpleName()); + } + final T proxyConnection = user.doAs(new PrivilegedExceptionAction() { + @Override + public T run() throws Exception { + return ClientRMProxy.createRMProxy(conf, protocol); + } + }); + + return proxyConnection; + } catch (IOException e) { + String message = + "Error while creating of RM application master service proxy for" + + " appAttemptId: " + user; + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (InterruptedException e) { + throw new YarnRuntimeException(e); + } + } + + private static void setAuthModeInConf(Configuration conf) { + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + } + + // updating the conf with the refreshed RM addresses as proxy creations + // are based out of conf + private static void updateConf(Configuration conf, + SubClusterId subClusterId) { + conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId()); + // In a Federation setting, we will connect to not just the local cluster RM + // but also multiple external RMs. The membership information of all the RMs + // that are currently + // participating in Federation is available in the central + // FederationStateStore. + // So we will: + // 1. obtain the RM service addresses from FederationStateStore using the + // FederationRMFailoverProxyProvider. + // 2. disable traditional HA as that depends on local configuration lookup + // for RMs using indexes. + // 3. we will enable federation failover IF traditional HA is enabled so + // that the appropriate failover RetryPolicy is initialized. + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, + FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class); + if (HAUtil.isHAEnabled(conf)) { + conf.setBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false); + } + } + + // disable instantiation + private FederationProxyProviderUtil() { + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java new file mode 100644 index 00000000000..c70362c75fe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.failover; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.client.RMFailoverProxyProvider; +import org.apache.hadoop.yarn.client.RMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A FailoverProxyProvider implementation that uses the + * {@code FederationStateStore} to determine the ResourceManager to connect to. + * This supports both HA and regular mode which is controlled by configuration. + */ +@Private +@Unstable +public class FederationRMFailoverProxyProvider + implements RMFailoverProxyProvider { + private static final Logger LOG = + LoggerFactory.getLogger(FederationRMFailoverProxyProvider.class); + + private RMProxy rmProxy; + private Class protocol; + private T current; + private YarnConfiguration conf; + private FederationStateStoreFacade facade; + private SubClusterId subClusterId; + private Collection> originalTokens; + private boolean federationFailoverEnabled = false; + + @Override + public void init(Configuration configuration, RMProxy proxy, + Class proto) { + this.rmProxy = proxy; + this.protocol = proto; + this.rmProxy.checkAllowedProtocols(this.protocol); + String clusterId = + configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID); + Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId"); + this.subClusterId = SubClusterId.newInstance(clusterId); + this.facade = facade.getInstance(); + if (configuration instanceof YarnConfiguration) { + this.conf = (YarnConfiguration) configuration; + } + federationFailoverEnabled = + conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, + YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); + + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, + YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES)); + + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + conf.getInt( + YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS, + YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS)); + + try { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + originalTokens = currentUser.getTokens(); + LOG.info("Initialized Federation proxy for user: {}", + currentUser.getUserName()); + } catch (IOException e) { + LOG.warn("Could not get information of requester, ignoring for now."); + } + + } + + private void addOriginalTokens(UserGroupInformation currentUser) { + if (originalTokens == null || originalTokens.isEmpty()) { + return; + } + for (Token token : originalTokens) { + currentUser.addToken(token); + } + } + + private T getProxyInternal(boolean isFailover) { + SubClusterInfo subClusterInfo; + UserGroupInformation currentUser = null; + try { + LOG.info("Failing over to the ResourceManager for SubClusterId: {}", + subClusterId); + subClusterInfo = facade.getSubCluster(subClusterId, isFailover); + // updating the conf with the refreshed RM addresses as proxy + // creations + // are based out of conf + updateRMAddress(subClusterInfo); + currentUser = UserGroupInformation.getCurrentUser(); + addOriginalTokens(currentUser); + } catch (YarnException e) { + LOG.error("Exception while trying to create proxy to the ResourceManager" + + " for SubClusterId: {}", subClusterId, e); + return null; + } catch (IOException e) { + LOG.warn("Could not get information of requester, ignoring for now."); + } + try { + final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); + LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress, + protocol.getSimpleName(), currentUser); + LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress, + subClusterId); + return rmProxy.getProxy(conf, protocol, rmAddress); + } catch (IOException ioe) { + LOG.error( + "IOException while trying to create proxy to the ResourceManager" + + " for SubClusterId: {}", + subClusterId, ioe); + return null; + } + } + + private void updateRMAddress(SubClusterInfo subClusterInfo) { + if (subClusterInfo != null) { + if (protocol == ApplicationClientProtocol.class) { + conf.set(YarnConfiguration.RM_ADDRESS, + subClusterInfo.getClientRMServiceAddress()); + } else if (protocol == ApplicationMasterProtocol.class) { + conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + subClusterInfo.getAMRMServiceAddress()); + } else if (protocol == ResourceManagerAdministrationProtocol.class) { + conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, + subClusterInfo.getRMAdminServiceAddress()); + } + } + } + + @Override + public synchronized ProxyInfo getProxy() { + if (current == null) { + current = getProxyInternal(false); + } + return new ProxyInfo(current, subClusterId.getId()); + } + + @Override + public synchronized void performFailover(T currentProxy) { + closeInternal(currentProxy); + current = getProxyInternal(federationFailoverEnabled); + } + + @Override + public Class getInterface() { + return protocol; + } + + private void closeInternal(T currentProxy) { + if ((currentProxy != null) && (currentProxy instanceof Closeable)) { + try { + ((Closeable) currentProxy).close(); + } catch (IOException e) { + LOG.warn("Exception while trying to close proxy", e); + } + } else { + RPC.stopProxy(currentProxy); + } + + } + + /** + * Close all the proxy objects which have been opened over the lifetime of + * this proxy provider. + */ + @Override + public synchronized void close() throws IOException { + closeInternal(current); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java new file mode 100644 index 00000000000..b1baa0c251a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.failover;