YARN-3673. Create a FailoverProxy for Federation services. Contributed by Subru Krishnan
(cherry picked from commit 3307564a5f
)
This commit is contained in:
parent
cfafd173bd
commit
bdfad4523f
|
@ -18,7 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.yarn.conf;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
|
@ -27,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class HAUtil {
|
||||
|
@ -44,6 +45,29 @@ public class HAUtil {
|
|||
throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if Federation is configured.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return true if federation is configured in the configuration; else false.
|
||||
*/
|
||||
public static boolean isFederationEnabled(Configuration conf) {
|
||||
return conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if RM failover is enabled in a Federation setting.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return if RM failover is enabled in conjunction with Federation in the
|
||||
* configuration; else false.
|
||||
*/
|
||||
public static boolean isFederationFailoverEnabled(Configuration conf) {
|
||||
return conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if Resource Manager HA is configured.
|
||||
*
|
||||
|
|
|
@ -2561,6 +2561,16 @@ public class YarnConfiguration extends Configuration {
|
|||
|
||||
public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
|
||||
|
||||
public static final String FEDERATION_ENABLED = FEDERATION_PREFIX + "enabled";
|
||||
public static final boolean DEFAULT_FEDERATION_ENABLED = false;
|
||||
|
||||
public static final String FEDERATION_FAILOVER_ENABLED =
|
||||
FEDERATION_PREFIX + "failover.enabled";
|
||||
public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
|
||||
|
||||
public static final String FEDERATION_SUBCLUSTER_ID =
|
||||
FEDERATION_PREFIX + "sub-cluster.id";
|
||||
|
||||
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
|
||||
FEDERATION_PREFIX + "state-store.class";
|
||||
|
||||
|
|
|
@ -71,6 +71,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
|||
// Federation default configs to be ignored
|
||||
configurationPropsToSkipCompare
|
||||
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
|
||||
configurationPropsToSkipCompare
|
||||
.add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
|
||||
configurationPropsToSkipCompare
|
||||
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
|
||||
|
||||
// Ignore blacklisting nodes for AM failures feature since it is still a
|
||||
// "work in progress"
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Unit tests for FederationRMFailoverProxyProvider.
|
||||
*/
|
||||
public class TestFederationRMFailoverProxyProvider {
|
||||
|
||||
private Configuration conf;
|
||||
private FederationStateStore stateStore;
|
||||
private final String dummyCapability = "cap";
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, YarnException {
|
||||
conf = new YarnConfiguration();
|
||||
stateStore = new MemoryFederationStateStore();
|
||||
stateStore.init(conf);
|
||||
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
stateStore.close();
|
||||
stateStore = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFederationRMFailoverProxyProvider() throws Exception {
|
||||
final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
|
||||
final MiniYARNCluster cluster = new MiniYARNCluster(
|
||||
"testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
|
||||
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3");
|
||||
|
||||
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||
2000);
|
||||
|
||||
HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
|
||||
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
|
||||
// Transition rm3 to active;
|
||||
makeRMActive(subClusterId, cluster, 2);
|
||||
|
||||
ApplicationClientProtocol client = FederationProxyProviderUtil
|
||||
.createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
|
||||
// client will retry until the rm becomes active.
|
||||
GetClusterMetricsResponse response =
|
||||
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
|
||||
|
||||
// validate response
|
||||
checkResponse(response);
|
||||
|
||||
// transition rm3 to standby
|
||||
cluster.getResourceManager(2).getRMContext().getRMAdminService()
|
||||
.transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
|
||||
|
||||
// Transition rm2 to active;
|
||||
makeRMActive(subClusterId, cluster, 1);
|
||||
response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
|
||||
|
||||
// validate response
|
||||
checkResponse(response);
|
||||
|
||||
cluster.stop();
|
||||
}
|
||||
|
||||
private void checkResponse(GetClusterMetricsResponse response) {
|
||||
Assert.assertNotNull(response.getClusterMetrics());
|
||||
Assert.assertEquals(0,
|
||||
response.getClusterMetrics().getNumActiveNodeManagers());
|
||||
}
|
||||
|
||||
private void makeRMActive(final SubClusterId subClusterId,
|
||||
final MiniYARNCluster cluster, final int index) {
|
||||
try {
|
||||
System.out.println("Transition rm" + (index + 1) + " to active");
|
||||
String dummyAddress = "host:" + index;
|
||||
cluster.getResourceManager(index).getRMContext().getRMAdminService()
|
||||
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
|
||||
ResourceManager rm = cluster.getResourceManager(index);
|
||||
InetSocketAddress amRMAddress =
|
||||
rm.getApplicationMasterService().getBindAddress();
|
||||
InetSocketAddress clientRMAddress =
|
||||
rm.getClientRMService().getBindAddress();
|
||||
SubClusterRegisterRequest request = SubClusterRegisterRequest
|
||||
.newInstance(SubClusterInfo.newInstance(subClusterId,
|
||||
amRMAddress.getAddress().getHostAddress() + ":"
|
||||
+ amRMAddress.getPort(),
|
||||
clientRMAddress.getAddress().getHostAddress() + ":"
|
||||
+ clientRMAddress.getPort(),
|
||||
dummyAddress, dummyAddress, SubClusterState.SC_NEW, 1,
|
||||
dummyCapability));
|
||||
stateStore.registerSubCluster(request);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -84,7 +84,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
|
||||
@Private
|
||||
@Override
|
||||
protected InetSocketAddress getRMAddress(YarnConfiguration conf,
|
||||
public InetSocketAddress getRMAddress(YarnConfiguration conf,
|
||||
Class<?> protocol) throws IOException {
|
||||
if (protocol == ApplicationClientProtocol.class) {
|
||||
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||
|
@ -111,7 +111,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
|
||||
@Private
|
||||
@Override
|
||||
protected void checkAllowedProtocols(Class<?> protocol) {
|
||||
public void checkAllowedProtocols(Class<?> protocol) {
|
||||
Preconditions.checkArgument(
|
||||
protocol.isAssignableFrom(ClientRMProtocols.class),
|
||||
"RM does not support this client protocol");
|
||||
|
|
|
@ -71,14 +71,14 @@ public class RMProxy<T> {
|
|||
* Verify the passed protocol is supported.
|
||||
*/
|
||||
@Private
|
||||
protected void checkAllowedProtocols(Class<?> protocol) {}
|
||||
public void checkAllowedProtocols(Class<?> protocol) {}
|
||||
|
||||
/**
|
||||
* Get the ResourceManager address from the provided Configuration for the
|
||||
* given protocol.
|
||||
*/
|
||||
@Private
|
||||
protected InetSocketAddress getRMAddress(
|
||||
public InetSocketAddress getRMAddress(
|
||||
YarnConfiguration conf, Class<?> protocol) throws IOException {
|
||||
throw new UnsupportedOperationException("This method should be invoked " +
|
||||
"from an instance of ClientRMProxy or ServerRMProxy");
|
||||
|
@ -97,7 +97,8 @@ public class RMProxy<T> {
|
|||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||
? (YarnConfiguration) configuration
|
||||
: new YarnConfiguration(configuration);
|
||||
RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
|
||||
RetryPolicy retryPolicy = createRetryPolicy(conf,
|
||||
(HAUtil.isHAEnabled(conf) || HAUtil.isFederationFailoverEnabled(conf)));
|
||||
return newProxyInstance(conf, protocol, instance, retryPolicy);
|
||||
}
|
||||
|
||||
|
@ -123,7 +124,7 @@ public class RMProxy<T> {
|
|||
private static <T> T newProxyInstance(final YarnConfiguration conf,
|
||||
final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy)
|
||||
throws IOException{
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
if (HAUtil.isHAEnabled(conf) || HAUtil.isFederationEnabled(conf)) {
|
||||
RMFailoverProxyProvider<T> provider =
|
||||
instance.createRMFailoverProxyProvider(conf, protocol);
|
||||
return (T) RetryProxy.create(protocol, provider, retryPolicy);
|
||||
|
@ -140,7 +141,7 @@ public class RMProxy<T> {
|
|||
* RetryProxy.
|
||||
*/
|
||||
@Private
|
||||
<T> T getProxy(final Configuration conf,
|
||||
public <T> T getProxy(final Configuration conf,
|
||||
final Class<T> protocol, final InetSocketAddress rmAddress)
|
||||
throws IOException {
|
||||
return user.doAs(
|
||||
|
|
|
@ -2688,6 +2688,13 @@
|
|||
</property>
|
||||
|
||||
<!-- Federation Configuration -->
|
||||
<property>
|
||||
<description>
|
||||
Flag to indicate whether the RM is participating in Federation or not.
|
||||
</description>
|
||||
<name>yarn.federation.enabled</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
<property>
|
||||
<description>
|
||||
Machine list file to be loaded by the FederationSubCluster Resolver
|
||||
|
|
|
@ -105,12 +105,10 @@
|
|||
<dependency>
|
||||
<groupId>javax.cache</groupId>
|
||||
<artifactId>cache-api</artifactId>
|
||||
<version>${jcache.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.ehcache</groupId>
|
||||
<artifactId>ehcache</artifactId>
|
||||
<version>${ehcache.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
|
|||
|
||||
@InterfaceAudience.Private
|
||||
@Override
|
||||
protected InetSocketAddress getRMAddress(YarnConfiguration conf,
|
||||
public InetSocketAddress getRMAddress(YarnConfiguration conf,
|
||||
Class<?> protocol) {
|
||||
if (protocol == ResourceTracker.class) {
|
||||
return conf.getSocketAddr(
|
||||
|
@ -93,7 +93,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
|
|||
|
||||
@InterfaceAudience.Private
|
||||
@Override
|
||||
protected void checkAllowedProtocols(Class<?> protocol) {
|
||||
public void checkAllowedProtocols(Class<?> protocol) {
|
||||
Preconditions.checkArgument(
|
||||
protocol.isAssignableFrom(ResourceTracker.class),
|
||||
"ResourceManager does not support this protocol");
|
||||
|
|
|
@ -0,0 +1,163 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.failover;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.security.SaslRpcServer;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Utility class that creates proxy for specified protocols when federation is
|
||||
* enabled. The class creates a federation aware failover provider, i.e. the
|
||||
* failover provider uses the {@code FederationStateStore} to determine the
|
||||
* current active ResourceManager
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public final class FederationProxyProviderUtil {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(FederationProxyProviderUtil.class);
|
||||
|
||||
/**
|
||||
* Create a proxy for the specified protocol. For non-HA, this is a direct
|
||||
* connection to the ResourceManager address. When HA is enabled, the proxy
|
||||
* handles the failover between the ResourceManagers as well.
|
||||
*
|
||||
* @param configuration Configuration to generate {@link ClientRMProxy}
|
||||
* @param protocol Protocol for the proxy
|
||||
* @param subClusterId the unique identifier or the sub-cluster
|
||||
* @param user the user on whose behalf the proxy is being created
|
||||
* @param <T> Type information of the proxy
|
||||
* @return Proxy to the RM
|
||||
* @throws IOException on failure
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public static <T> T createRMProxy(Configuration configuration,
|
||||
final Class<T> protocol, SubClusterId subClusterId,
|
||||
UserGroupInformation user) throws IOException {
|
||||
return createRMProxy(configuration, protocol, subClusterId, user, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a proxy for the specified protocol. For non-HA, this is a direct
|
||||
* connection to the ResourceManager address. When HA is enabled, the proxy
|
||||
* handles the failover between the ResourceManagers as well.
|
||||
*
|
||||
* @param configuration Configuration to generate {@link ClientRMProxy}
|
||||
* @param protocol Protocol for the proxy
|
||||
* @param subClusterId the unique identifier or the sub-cluster
|
||||
* @param user the user on whose behalf the proxy is being created
|
||||
* @param token the auth token to use for connection
|
||||
* @param <T> Type information of the proxy
|
||||
* @return Proxy to the RM
|
||||
* @throws IOException on failure
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> T createRMProxy(final Configuration configuration,
|
||||
final Class<T> protocol, SubClusterId subClusterId,
|
||||
UserGroupInformation user, final Token token) throws IOException {
|
||||
try {
|
||||
final YarnConfiguration conf = new YarnConfiguration(configuration);
|
||||
updateConf(conf, subClusterId);
|
||||
if (token != null) {
|
||||
LOG.info(
|
||||
"Creating RMProxy with a token: {} to subcluster: {}"
|
||||
+ " for protocol: {}",
|
||||
token, subClusterId, protocol.getSimpleName());
|
||||
user.addToken(token);
|
||||
setAuthModeInConf(conf);
|
||||
} else {
|
||||
LOG.info("Creating RMProxy without a token to subcluster: {}"
|
||||
+ " for protocol: {}", subClusterId, protocol.getSimpleName());
|
||||
}
|
||||
final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
|
||||
@Override
|
||||
public T run() throws Exception {
|
||||
return ClientRMProxy.createRMProxy(conf, protocol);
|
||||
}
|
||||
});
|
||||
|
||||
return proxyConnection;
|
||||
} catch (IOException e) {
|
||||
String message =
|
||||
"Error while creating of RM application master service proxy for"
|
||||
+ " appAttemptId: " + user;
|
||||
LOG.info(message);
|
||||
throw new YarnRuntimeException(message, e);
|
||||
} catch (InterruptedException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void setAuthModeInConf(Configuration conf) {
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
SaslRpcServer.AuthMethod.TOKEN.toString());
|
||||
}
|
||||
|
||||
// updating the conf with the refreshed RM addresses as proxy creations
|
||||
// are based out of conf
|
||||
private static void updateConf(Configuration conf,
|
||||
SubClusterId subClusterId) {
|
||||
conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId());
|
||||
// In a Federation setting, we will connect to not just the local cluster RM
|
||||
// but also multiple external RMs. The membership information of all the RMs
|
||||
// that are currently
|
||||
// participating in Federation is available in the central
|
||||
// FederationStateStore.
|
||||
// So we will:
|
||||
// 1. obtain the RM service addresses from FederationStateStore using the
|
||||
// FederationRMFailoverProxyProvider.
|
||||
// 2. disable traditional HA as that depends on local configuration lookup
|
||||
// for RMs using indexes.
|
||||
// 3. we will enable federation failover IF traditional HA is enabled so
|
||||
// that the appropriate failover RetryPolicy is initialized.
|
||||
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
|
||||
FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
conf.setBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false);
|
||||
}
|
||||
}
|
||||
|
||||
// disable instantiation
|
||||
private FederationProxyProviderUtil() {
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,211 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.failover;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
|
||||
import org.apache.hadoop.yarn.client.RMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* A FailoverProxyProvider implementation that uses the
|
||||
* {@code FederationStateStore} to determine the ResourceManager to connect to.
|
||||
* This supports both HA and regular mode which is controlled by configuration.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class FederationRMFailoverProxyProvider<T>
|
||||
implements RMFailoverProxyProvider<T> {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FederationRMFailoverProxyProvider.class);
|
||||
|
||||
private RMProxy<T> rmProxy;
|
||||
private Class<T> protocol;
|
||||
private T current;
|
||||
private YarnConfiguration conf;
|
||||
private FederationStateStoreFacade facade;
|
||||
private SubClusterId subClusterId;
|
||||
private Collection<Token<? extends TokenIdentifier>> originalTokens;
|
||||
private boolean federationFailoverEnabled = false;
|
||||
|
||||
@Override
|
||||
public void init(Configuration configuration, RMProxy<T> proxy,
|
||||
Class<T> proto) {
|
||||
this.rmProxy = proxy;
|
||||
this.protocol = proto;
|
||||
this.rmProxy.checkAllowedProtocols(this.protocol);
|
||||
String clusterId =
|
||||
configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
|
||||
Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId");
|
||||
this.subClusterId = SubClusterId.newInstance(clusterId);
|
||||
this.facade = facade.getInstance();
|
||||
if (configuration instanceof YarnConfiguration) {
|
||||
this.conf = (YarnConfiguration) configuration;
|
||||
}
|
||||
federationFailoverEnabled =
|
||||
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
|
||||
|
||||
conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
|
||||
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
|
||||
|
||||
conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
conf.getInt(
|
||||
YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
|
||||
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
|
||||
|
||||
try {
|
||||
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
|
||||
originalTokens = currentUser.getTokens();
|
||||
LOG.info("Initialized Federation proxy for user: {}",
|
||||
currentUser.getUserName());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Could not get information of requester, ignoring for now.");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void addOriginalTokens(UserGroupInformation currentUser) {
|
||||
if (originalTokens == null || originalTokens.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (Token<? extends TokenIdentifier> token : originalTokens) {
|
||||
currentUser.addToken(token);
|
||||
}
|
||||
}
|
||||
|
||||
private T getProxyInternal(boolean isFailover) {
|
||||
SubClusterInfo subClusterInfo;
|
||||
UserGroupInformation currentUser = null;
|
||||
try {
|
||||
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
|
||||
subClusterId);
|
||||
subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
|
||||
// updating the conf with the refreshed RM addresses as proxy
|
||||
// creations
|
||||
// are based out of conf
|
||||
updateRMAddress(subClusterInfo);
|
||||
currentUser = UserGroupInformation.getCurrentUser();
|
||||
addOriginalTokens(currentUser);
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Exception while trying to create proxy to the ResourceManager"
|
||||
+ " for SubClusterId: {}", subClusterId, e);
|
||||
return null;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Could not get information of requester, ignoring for now.");
|
||||
}
|
||||
try {
|
||||
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
||||
LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
|
||||
protocol.getSimpleName(), currentUser);
|
||||
LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
|
||||
subClusterId);
|
||||
return rmProxy.getProxy(conf, protocol, rmAddress);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error(
|
||||
"IOException while trying to create proxy to the ResourceManager"
|
||||
+ " for SubClusterId: {}",
|
||||
subClusterId, ioe);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void updateRMAddress(SubClusterInfo subClusterInfo) {
|
||||
if (subClusterInfo != null) {
|
||||
if (protocol == ApplicationClientProtocol.class) {
|
||||
conf.set(YarnConfiguration.RM_ADDRESS,
|
||||
subClusterInfo.getClientRMServiceAddress());
|
||||
} else if (protocol == ApplicationMasterProtocol.class) {
|
||||
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
subClusterInfo.getAMRMServiceAddress());
|
||||
} else if (protocol == ResourceManagerAdministrationProtocol.class) {
|
||||
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||
subClusterInfo.getRMAdminServiceAddress());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ProxyInfo<T> getProxy() {
|
||||
if (current == null) {
|
||||
current = getProxyInternal(false);
|
||||
}
|
||||
return new ProxyInfo<T>(current, subClusterId.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void performFailover(T currentProxy) {
|
||||
closeInternal(currentProxy);
|
||||
current = getProxyInternal(federationFailoverEnabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<T> getInterface() {
|
||||
return protocol;
|
||||
}
|
||||
|
||||
private void closeInternal(T currentProxy) {
|
||||
if ((currentProxy != null) && (currentProxy instanceof Closeable)) {
|
||||
try {
|
||||
((Closeable) currentProxy).close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception while trying to close proxy", e);
|
||||
}
|
||||
} else {
|
||||
RPC.stopProxy(currentProxy);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all the proxy objects which have been opened over the lifetime of
|
||||
* this proxy provider.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
closeInternal(current);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.federation.failover;
|
Loading…
Reference in New Issue