YARN-3673. Create a FailoverProxy for Federation services. Contributed by Subru Krishnan

(cherry picked from commit 3307564a5f)
(cherry picked from commit bdfad4523f)
This commit is contained in:
Jian He 2016-08-22 14:43:07 +08:00 committed by Carlo Curino
parent 32a8618f39
commit 615c912b61
12 changed files with 603 additions and 14 deletions

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.conf; package org.apache.hadoop.yarn.conf;
import com.google.common.annotations.VisibleForTesting; import java.net.InetSocketAddress;
import java.util.Collection;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -27,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.net.InetSocketAddress; import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
@InterfaceAudience.Private @InterfaceAudience.Private
public class HAUtil { public class HAUtil {
@ -44,6 +45,29 @@ public class HAUtil {
throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg); throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
} }
/**
* Returns true if Federation is configured.
*
* @param conf Configuration
* @return true if federation is configured in the configuration; else false.
*/
public static boolean isFederationEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
}
/**
* Returns true if RM failover is enabled in a Federation setting.
*
* @param conf Configuration
* @return if RM failover is enabled in conjunction with Federation in the
* configuration; else false.
*/
public static boolean isFederationFailoverEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
}
/** /**
* Returns true if Resource Manager HA is configured. * Returns true if Resource Manager HA is configured.
* *

View File

@ -2508,6 +2508,16 @@ public class YarnConfiguration extends Configuration {
public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation."; public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
public static final String FEDERATION_ENABLED = FEDERATION_PREFIX + "enabled";
public static final boolean DEFAULT_FEDERATION_ENABLED = false;
public static final String FEDERATION_FAILOVER_ENABLED =
FEDERATION_PREFIX + "failover.enabled";
public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
public static final String FEDERATION_SUBCLUSTER_ID =
FEDERATION_PREFIX + "sub-cluster.id";
public static final String FEDERATION_STATESTORE_CLIENT_CLASS = public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
FEDERATION_PREFIX + "state-store.class"; FEDERATION_PREFIX + "state-store.class";

View File

@ -73,6 +73,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
// Federation default configs to be ignored // Federation default configs to be ignored
configurationPropsToSkipCompare configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS); .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
// Ignore blacklisting nodes for AM failures feature since it is still a // Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress" // "work in progress"

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Unit tests for FederationRMFailoverProxyProvider.
*/
public class TestFederationRMFailoverProxyProvider {
private Configuration conf;
private FederationStateStore stateStore;
private final String dummyCapability = "cap";
@Before
public void setUp() throws IOException, YarnException {
conf = new YarnConfiguration();
stateStore = new MemoryFederationStateStore();
stateStore.init(conf);
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
}
@After
public void tearDown() throws Exception {
stateStore.close();
stateStore = null;
}
@Test
public void testFederationRMFailoverProxyProvider() throws Exception {
final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
final MiniYARNCluster cluster = new MiniYARNCluster(
"testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3");
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
2000);
HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
cluster.init(conf);
cluster.start();
// Transition rm3 to active;
makeRMActive(subClusterId, cluster, 2);
ApplicationClientProtocol client = FederationProxyProviderUtil
.createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
UserGroupInformation.getCurrentUser());
// client will retry until the rm becomes active.
GetClusterMetricsResponse response =
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
// validate response
checkResponse(response);
// transition rm3 to standby
cluster.getResourceManager(2).getRMContext().getRMAdminService()
.transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
// Transition rm2 to active;
makeRMActive(subClusterId, cluster, 1);
response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
// validate response
checkResponse(response);
cluster.stop();
}
private void checkResponse(GetClusterMetricsResponse response) {
Assert.assertNotNull(response.getClusterMetrics());
Assert.assertEquals(0,
response.getClusterMetrics().getNumActiveNodeManagers());
}
private void makeRMActive(final SubClusterId subClusterId,
final MiniYARNCluster cluster, final int index) {
try {
System.out.println("Transition rm" + (index + 1) + " to active");
String dummyAddress = "host:" + index;
cluster.getResourceManager(index).getRMContext().getRMAdminService()
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
ResourceManager rm = cluster.getResourceManager(index);
InetSocketAddress amRMAddress =
rm.getApplicationMasterService().getBindAddress();
InetSocketAddress clientRMAddress =
rm.getClientRMService().getBindAddress();
SubClusterRegisterRequest request = SubClusterRegisterRequest
.newInstance(SubClusterInfo.newInstance(subClusterId,
amRMAddress.getAddress().getHostAddress() + ":"
+ amRMAddress.getPort(),
clientRMAddress.getAddress().getHostAddress() + ":"
+ clientRMAddress.getPort(),
dummyAddress, dummyAddress, SubClusterState.SC_NEW, 1,
dummyCapability));
stateStore.registerSubCluster(request);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -84,7 +84,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
@Private @Private
@Override @Override
protected InetSocketAddress getRMAddress(YarnConfiguration conf, public InetSocketAddress getRMAddress(YarnConfiguration conf,
Class<?> protocol) throws IOException { Class<?> protocol) throws IOException {
if (protocol == ApplicationClientProtocol.class) { if (protocol == ApplicationClientProtocol.class) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
@ -111,7 +111,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
@Private @Private
@Override @Override
protected void checkAllowedProtocols(Class<?> protocol) { public void checkAllowedProtocols(Class<?> protocol) {
Preconditions.checkArgument( Preconditions.checkArgument(
protocol.isAssignableFrom(ClientRMProtocols.class), protocol.isAssignableFrom(ClientRMProtocols.class),
"RM does not support this client protocol"); "RM does not support this client protocol");

View File

@ -71,14 +71,14 @@ public class RMProxy<T> {
* Verify the passed protocol is supported. * Verify the passed protocol is supported.
*/ */
@Private @Private
protected void checkAllowedProtocols(Class<?> protocol) {} public void checkAllowedProtocols(Class<?> protocol) {}
/** /**
* Get the ResourceManager address from the provided Configuration for the * Get the ResourceManager address from the provided Configuration for the
* given protocol. * given protocol.
*/ */
@Private @Private
protected InetSocketAddress getRMAddress( public InetSocketAddress getRMAddress(
YarnConfiguration conf, Class<?> protocol) throws IOException { YarnConfiguration conf, Class<?> protocol) throws IOException {
throw new UnsupportedOperationException("This method should be invoked " + throw new UnsupportedOperationException("This method should be invoked " +
"from an instance of ClientRMProxy or ServerRMProxy"); "from an instance of ClientRMProxy or ServerRMProxy");
@ -97,7 +97,8 @@ public class RMProxy<T> {
YarnConfiguration conf = (configuration instanceof YarnConfiguration) YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration ? (YarnConfiguration) configuration
: new YarnConfiguration(configuration); : new YarnConfiguration(configuration);
RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf)); RetryPolicy retryPolicy = createRetryPolicy(conf,
(HAUtil.isHAEnabled(conf) || HAUtil.isFederationFailoverEnabled(conf)));
return newProxyInstance(conf, protocol, instance, retryPolicy); return newProxyInstance(conf, protocol, instance, retryPolicy);
} }
@ -123,7 +124,7 @@ public class RMProxy<T> {
private static <T> T newProxyInstance(final YarnConfiguration conf, private static <T> T newProxyInstance(final YarnConfiguration conf,
final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy) final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy)
throws IOException{ throws IOException{
if (HAUtil.isHAEnabled(conf)) { if (HAUtil.isHAEnabled(conf) || HAUtil.isFederationEnabled(conf)) {
RMFailoverProxyProvider<T> provider = RMFailoverProxyProvider<T> provider =
instance.createRMFailoverProxyProvider(conf, protocol); instance.createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy); return (T) RetryProxy.create(protocol, provider, retryPolicy);
@ -140,7 +141,7 @@ public class RMProxy<T> {
* RetryProxy. * RetryProxy.
*/ */
@Private @Private
<T> T getProxy(final Configuration conf, public <T> T getProxy(final Configuration conf,
final Class<T> protocol, final InetSocketAddress rmAddress) final Class<T> protocol, final InetSocketAddress rmAddress)
throws IOException { throws IOException {
return user.doAs( return user.doAs(

View File

@ -2619,6 +2619,13 @@
</property> </property>
<!-- Federation Configuration --> <!-- Federation Configuration -->
<property>
<description>
Flag to indicate whether the RM is participating in Federation or not.
</description>
<name>yarn.federation.enabled</name>
<value>false</value>
</property>
<property> <property>
<description> <description>
Machine list file to be loaded by the FederationSubCluster Resolver Machine list file to be loaded by the FederationSubCluster Resolver

View File

@ -112,12 +112,10 @@
<dependency> <dependency>
<groupId>javax.cache</groupId> <groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId> <artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.ehcache</groupId> <groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId> <artifactId>ehcache</artifactId>
<version>${ehcache.version}</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -71,7 +71,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
@InterfaceAudience.Private @InterfaceAudience.Private
@Override @Override
protected InetSocketAddress getRMAddress(YarnConfiguration conf, public InetSocketAddress getRMAddress(YarnConfiguration conf,
Class<?> protocol) { Class<?> protocol) {
if (protocol == ResourceTracker.class) { if (protocol == ResourceTracker.class) {
return conf.getSocketAddr( return conf.getSocketAddr(
@ -93,7 +93,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
@InterfaceAudience.Private @InterfaceAudience.Private
@Override @Override
protected void checkAllowedProtocols(Class<?> protocol) { public void checkAllowedProtocols(Class<?> protocol) {
Preconditions.checkArgument( Preconditions.checkArgument(
protocol.isAssignableFrom(ResourceTracker.class), protocol.isAssignableFrom(ResourceTracker.class),
"ResourceManager does not support this protocol"); "ResourceManager does not support this protocol");

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.failover;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class that creates proxy for specified protocols when federation is
* enabled. The class creates a federation aware failover provider, i.e. the
* failover provider uses the {@code FederationStateStore} to determine the
* current active ResourceManager
*/
@Private
@Unstable
public final class FederationProxyProviderUtil {
public static final Logger LOG =
LoggerFactory.getLogger(FederationProxyProviderUtil.class);
/**
* Create a proxy for the specified protocol. For non-HA, this is a direct
* connection to the ResourceManager address. When HA is enabled, the proxy
* handles the failover between the ResourceManagers as well.
*
* @param configuration Configuration to generate {@link ClientRMProxy}
* @param protocol Protocol for the proxy
* @param subClusterId the unique identifier or the sub-cluster
* @param user the user on whose behalf the proxy is being created
* @param <T> Type information of the proxy
* @return Proxy to the RM
* @throws IOException on failure
*/
@Public
@Unstable
public static <T> T createRMProxy(Configuration configuration,
final Class<T> protocol, SubClusterId subClusterId,
UserGroupInformation user) throws IOException {
return createRMProxy(configuration, protocol, subClusterId, user, null);
}
/**
* Create a proxy for the specified protocol. For non-HA, this is a direct
* connection to the ResourceManager address. When HA is enabled, the proxy
* handles the failover between the ResourceManagers as well.
*
* @param configuration Configuration to generate {@link ClientRMProxy}
* @param protocol Protocol for the proxy
* @param subClusterId the unique identifier or the sub-cluster
* @param user the user on whose behalf the proxy is being created
* @param token the auth token to use for connection
* @param <T> Type information of the proxy
* @return Proxy to the RM
* @throws IOException on failure
*/
@Public
@Unstable
@SuppressWarnings("unchecked")
public static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol, SubClusterId subClusterId,
UserGroupInformation user, final Token token) throws IOException {
try {
final YarnConfiguration conf = new YarnConfiguration(configuration);
updateConf(conf, subClusterId);
if (token != null) {
LOG.info(
"Creating RMProxy with a token: {} to subcluster: {}"
+ " for protocol: {}",
token, subClusterId, protocol.getSimpleName());
user.addToken(token);
setAuthModeInConf(conf);
} else {
LOG.info("Creating RMProxy without a token to subcluster: {}"
+ " for protocol: {}", subClusterId, protocol.getSimpleName());
}
final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
return ClientRMProxy.createRMProxy(conf, protocol);
}
});
return proxyConnection;
} catch (IOException e) {
String message =
"Error while creating of RM application master service proxy for"
+ " appAttemptId: " + user;
LOG.info(message);
throw new YarnRuntimeException(message, e);
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
}
private static void setAuthModeInConf(Configuration conf) {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
SaslRpcServer.AuthMethod.TOKEN.toString());
}
// updating the conf with the refreshed RM addresses as proxy creations
// are based out of conf
private static void updateConf(Configuration conf,
SubClusterId subClusterId) {
conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId());
// In a Federation setting, we will connect to not just the local cluster RM
// but also multiple external RMs. The membership information of all the RMs
// that are currently
// participating in Federation is available in the central
// FederationStateStore.
// So we will:
// 1. obtain the RM service addresses from FederationStateStore using the
// FederationRMFailoverProxyProvider.
// 2. disable traditional HA as that depends on local configuration lookup
// for RMs using indexes.
// 3. we will enable federation failover IF traditional HA is enabled so
// that the appropriate failover RetryPolicy is initialized.
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
if (HAUtil.isHAEnabled(conf)) {
conf.setBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false);
}
}
// disable instantiation
private FederationProxyProviderUtil() {
}
}

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.failover;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* A FailoverProxyProvider implementation that uses the
* {@code FederationStateStore} to determine the ResourceManager to connect to.
* This supports both HA and regular mode which is controlled by configuration.
*/
@Private
@Unstable
public class FederationRMFailoverProxyProvider<T>
implements RMFailoverProxyProvider<T> {
private static final Logger LOG =
LoggerFactory.getLogger(FederationRMFailoverProxyProvider.class);
private RMProxy<T> rmProxy;
private Class<T> protocol;
private T current;
private YarnConfiguration conf;
private FederationStateStoreFacade facade;
private SubClusterId subClusterId;
private Collection<Token<? extends TokenIdentifier>> originalTokens;
private boolean federationFailoverEnabled = false;
@Override
public void init(Configuration configuration, RMProxy<T> proxy,
Class<T> proto) {
this.rmProxy = proxy;
this.protocol = proto;
this.rmProxy.checkAllowedProtocols(this.protocol);
String clusterId =
configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId");
this.subClusterId = SubClusterId.newInstance(clusterId);
this.facade = facade.getInstance();
if (configuration instanceof YarnConfiguration) {
this.conf = (YarnConfiguration) configuration;
}
federationFailoverEnabled =
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
conf.getInt(
YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
try {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
originalTokens = currentUser.getTokens();
LOG.info("Initialized Federation proxy for user: {}",
currentUser.getUserName());
} catch (IOException e) {
LOG.warn("Could not get information of requester, ignoring for now.");
}
}
private void addOriginalTokens(UserGroupInformation currentUser) {
if (originalTokens == null || originalTokens.isEmpty()) {
return;
}
for (Token<? extends TokenIdentifier> token : originalTokens) {
currentUser.addToken(token);
}
}
private T getProxyInternal(boolean isFailover) {
SubClusterInfo subClusterInfo;
UserGroupInformation currentUser = null;
try {
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
subClusterId);
subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
// updating the conf with the refreshed RM addresses as proxy
// creations
// are based out of conf
updateRMAddress(subClusterInfo);
currentUser = UserGroupInformation.getCurrentUser();
addOriginalTokens(currentUser);
} catch (YarnException e) {
LOG.error("Exception while trying to create proxy to the ResourceManager"
+ " for SubClusterId: {}", subClusterId, e);
return null;
} catch (IOException e) {
LOG.warn("Could not get information of requester, ignoring for now.");
}
try {
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
protocol.getSimpleName(), currentUser);
LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
subClusterId);
return rmProxy.getProxy(conf, protocol, rmAddress);
} catch (IOException ioe) {
LOG.error(
"IOException while trying to create proxy to the ResourceManager"
+ " for SubClusterId: {}",
subClusterId, ioe);
return null;
}
}
private void updateRMAddress(SubClusterInfo subClusterInfo) {
if (subClusterInfo != null) {
if (protocol == ApplicationClientProtocol.class) {
conf.set(YarnConfiguration.RM_ADDRESS,
subClusterInfo.getClientRMServiceAddress());
} else if (protocol == ApplicationMasterProtocol.class) {
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
subClusterInfo.getAMRMServiceAddress());
} else if (protocol == ResourceManagerAdministrationProtocol.class) {
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS,
subClusterInfo.getRMAdminServiceAddress());
}
}
}
@Override
public synchronized ProxyInfo<T> getProxy() {
if (current == null) {
current = getProxyInternal(false);
}
return new ProxyInfo<T>(current, subClusterId.getId());
}
@Override
public synchronized void performFailover(T currentProxy) {
closeInternal(currentProxy);
current = getProxyInternal(federationFailoverEnabled);
}
@Override
public Class<T> getInterface() {
return protocol;
}
private void closeInternal(T currentProxy) {
if ((currentProxy != null) && (currentProxy instanceof Closeable)) {
try {
((Closeable) currentProxy).close();
} catch (IOException e) {
LOG.warn("Exception while trying to close proxy", e);
}
} else {
RPC.stopProxy(currentProxy);
}
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* this proxy provider.
*/
@Override
public synchronized void close() throws IOException {
closeInternal(current);
}
}

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.failover;