YARN-3673. Create a FailoverProxy for Federation services. Contributed by Subru Krishnan
This commit is contained in:
parent
bd44182e70
commit
3307564a5f
|
@ -18,7 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.conf;
|
package org.apache.hadoop.yarn.conf;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
@ -27,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class HAUtil {
|
public class HAUtil {
|
||||||
|
@ -44,6 +45,29 @@ public class HAUtil {
|
||||||
throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
|
throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if Federation is configured.
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @return true if federation is configured in the configuration; else false.
|
||||||
|
*/
|
||||||
|
public static boolean isFederationEnabled(Configuration conf) {
|
||||||
|
return conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if RM failover is enabled in a Federation setting.
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @return if RM failover is enabled in conjunction with Federation in the
|
||||||
|
* configuration; else false.
|
||||||
|
*/
|
||||||
|
public static boolean isFederationFailoverEnabled(Configuration conf) {
|
||||||
|
return conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if Resource Manager HA is configured.
|
* Returns true if Resource Manager HA is configured.
|
||||||
*
|
*
|
||||||
|
|
|
@ -2561,6 +2561,16 @@ public class YarnConfiguration extends Configuration {
|
||||||
|
|
||||||
public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
|
public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
|
||||||
|
|
||||||
|
public static final String FEDERATION_ENABLED = FEDERATION_PREFIX + "enabled";
|
||||||
|
public static final boolean DEFAULT_FEDERATION_ENABLED = false;
|
||||||
|
|
||||||
|
public static final String FEDERATION_FAILOVER_ENABLED =
|
||||||
|
FEDERATION_PREFIX + "failover.enabled";
|
||||||
|
public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
|
||||||
|
|
||||||
|
public static final String FEDERATION_SUBCLUSTER_ID =
|
||||||
|
FEDERATION_PREFIX + "sub-cluster.id";
|
||||||
|
|
||||||
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
|
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
|
||||||
FEDERATION_PREFIX + "state-store.class";
|
FEDERATION_PREFIX + "state-store.class";
|
||||||
|
|
||||||
|
|
|
@ -71,6 +71,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
||||||
// Federation default configs to be ignored
|
// Federation default configs to be ignored
|
||||||
configurationPropsToSkipCompare
|
configurationPropsToSkipCompare
|
||||||
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
|
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
|
||||||
|
configurationPropsToSkipCompare
|
||||||
|
.add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
|
||||||
|
configurationPropsToSkipCompare
|
||||||
|
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
|
||||||
|
|
||||||
// Ignore blacklisting nodes for AM failures feature since it is still a
|
// Ignore blacklisting nodes for AM failures feature since it is still a
|
||||||
// "work in progress"
|
// "work in progress"
|
||||||
|
|
|
@ -0,0 +1,154 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for FederationRMFailoverProxyProvider.
|
||||||
|
*/
|
||||||
|
public class TestFederationRMFailoverProxyProvider {
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private FederationStateStore stateStore;
|
||||||
|
private final String dummyCapability = "cap";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException, YarnException {
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
stateStore = new MemoryFederationStateStore();
|
||||||
|
stateStore.init(conf);
|
||||||
|
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
stateStore.close();
|
||||||
|
stateStore = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFederationRMFailoverProxyProvider() throws Exception {
|
||||||
|
final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
|
||||||
|
final MiniYARNCluster cluster = new MiniYARNCluster(
|
||||||
|
"testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
|
||||||
|
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
|
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3");
|
||||||
|
|
||||||
|
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||||
|
2000);
|
||||||
|
|
||||||
|
HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
|
||||||
|
HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
|
||||||
|
HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||||
|
|
||||||
|
cluster.init(conf);
|
||||||
|
cluster.start();
|
||||||
|
|
||||||
|
// Transition rm3 to active;
|
||||||
|
makeRMActive(subClusterId, cluster, 2);
|
||||||
|
|
||||||
|
ApplicationClientProtocol client = FederationProxyProviderUtil
|
||||||
|
.createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
|
||||||
|
UserGroupInformation.getCurrentUser());
|
||||||
|
|
||||||
|
// client will retry until the rm becomes active.
|
||||||
|
GetClusterMetricsResponse response =
|
||||||
|
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
|
||||||
|
|
||||||
|
// validate response
|
||||||
|
checkResponse(response);
|
||||||
|
|
||||||
|
// transition rm3 to standby
|
||||||
|
cluster.getResourceManager(2).getRMContext().getRMAdminService()
|
||||||
|
.transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
|
||||||
|
|
||||||
|
// Transition rm2 to active;
|
||||||
|
makeRMActive(subClusterId, cluster, 1);
|
||||||
|
response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
|
||||||
|
|
||||||
|
// validate response
|
||||||
|
checkResponse(response);
|
||||||
|
|
||||||
|
cluster.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkResponse(GetClusterMetricsResponse response) {
|
||||||
|
Assert.assertNotNull(response.getClusterMetrics());
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
response.getClusterMetrics().getNumActiveNodeManagers());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void makeRMActive(final SubClusterId subClusterId,
|
||||||
|
final MiniYARNCluster cluster, final int index) {
|
||||||
|
try {
|
||||||
|
System.out.println("Transition rm" + (index + 1) + " to active");
|
||||||
|
String dummyAddress = "host:" + index;
|
||||||
|
cluster.getResourceManager(index).getRMContext().getRMAdminService()
|
||||||
|
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
|
||||||
|
ResourceManager rm = cluster.getResourceManager(index);
|
||||||
|
InetSocketAddress amRMAddress =
|
||||||
|
rm.getApplicationMasterService().getBindAddress();
|
||||||
|
InetSocketAddress clientRMAddress =
|
||||||
|
rm.getClientRMService().getBindAddress();
|
||||||
|
SubClusterRegisterRequest request = SubClusterRegisterRequest
|
||||||
|
.newInstance(SubClusterInfo.newInstance(subClusterId,
|
||||||
|
amRMAddress.getAddress().getHostAddress() + ":"
|
||||||
|
+ amRMAddress.getPort(),
|
||||||
|
clientRMAddress.getAddress().getHostAddress() + ":"
|
||||||
|
+ clientRMAddress.getPort(),
|
||||||
|
dummyAddress, dummyAddress, SubClusterState.SC_NEW, 1,
|
||||||
|
dummyCapability));
|
||||||
|
stateStore.registerSubCluster(request);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -84,7 +84,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Override
|
@Override
|
||||||
protected InetSocketAddress getRMAddress(YarnConfiguration conf,
|
public InetSocketAddress getRMAddress(YarnConfiguration conf,
|
||||||
Class<?> protocol) throws IOException {
|
Class<?> protocol) throws IOException {
|
||||||
if (protocol == ApplicationClientProtocol.class) {
|
if (protocol == ApplicationClientProtocol.class) {
|
||||||
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||||
|
@ -111,7 +111,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Override
|
@Override
|
||||||
protected void checkAllowedProtocols(Class<?> protocol) {
|
public void checkAllowedProtocols(Class<?> protocol) {
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
protocol.isAssignableFrom(ClientRMProtocols.class),
|
protocol.isAssignableFrom(ClientRMProtocols.class),
|
||||||
"RM does not support this client protocol");
|
"RM does not support this client protocol");
|
||||||
|
|
|
@ -71,14 +71,14 @@ public class RMProxy<T> {
|
||||||
* Verify the passed protocol is supported.
|
* Verify the passed protocol is supported.
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
protected void checkAllowedProtocols(Class<?> protocol) {}
|
public void checkAllowedProtocols(Class<?> protocol) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the ResourceManager address from the provided Configuration for the
|
* Get the ResourceManager address from the provided Configuration for the
|
||||||
* given protocol.
|
* given protocol.
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
protected InetSocketAddress getRMAddress(
|
public InetSocketAddress getRMAddress(
|
||||||
YarnConfiguration conf, Class<?> protocol) throws IOException {
|
YarnConfiguration conf, Class<?> protocol) throws IOException {
|
||||||
throw new UnsupportedOperationException("This method should be invoked " +
|
throw new UnsupportedOperationException("This method should be invoked " +
|
||||||
"from an instance of ClientRMProxy or ServerRMProxy");
|
"from an instance of ClientRMProxy or ServerRMProxy");
|
||||||
|
@ -97,7 +97,8 @@ public class RMProxy<T> {
|
||||||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||||
? (YarnConfiguration) configuration
|
? (YarnConfiguration) configuration
|
||||||
: new YarnConfiguration(configuration);
|
: new YarnConfiguration(configuration);
|
||||||
RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
|
RetryPolicy retryPolicy = createRetryPolicy(conf,
|
||||||
|
(HAUtil.isHAEnabled(conf) || HAUtil.isFederationFailoverEnabled(conf)));
|
||||||
return newProxyInstance(conf, protocol, instance, retryPolicy);
|
return newProxyInstance(conf, protocol, instance, retryPolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,7 +124,7 @@ public class RMProxy<T> {
|
||||||
private static <T> T newProxyInstance(final YarnConfiguration conf,
|
private static <T> T newProxyInstance(final YarnConfiguration conf,
|
||||||
final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy)
|
final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy)
|
||||||
throws IOException{
|
throws IOException{
|
||||||
if (HAUtil.isHAEnabled(conf)) {
|
if (HAUtil.isHAEnabled(conf) || HAUtil.isFederationEnabled(conf)) {
|
||||||
RMFailoverProxyProvider<T> provider =
|
RMFailoverProxyProvider<T> provider =
|
||||||
instance.createRMFailoverProxyProvider(conf, protocol);
|
instance.createRMFailoverProxyProvider(conf, protocol);
|
||||||
return (T) RetryProxy.create(protocol, provider, retryPolicy);
|
return (T) RetryProxy.create(protocol, provider, retryPolicy);
|
||||||
|
@ -140,7 +141,7 @@ public class RMProxy<T> {
|
||||||
* RetryProxy.
|
* RetryProxy.
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
<T> T getProxy(final Configuration conf,
|
public <T> T getProxy(final Configuration conf,
|
||||||
final Class<T> protocol, final InetSocketAddress rmAddress)
|
final Class<T> protocol, final InetSocketAddress rmAddress)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return user.doAs(
|
return user.doAs(
|
||||||
|
|
|
@ -2688,6 +2688,13 @@
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<!-- Federation Configuration -->
|
<!-- Federation Configuration -->
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
Flag to indicate whether the RM is participating in Federation or not.
|
||||||
|
</description>
|
||||||
|
<name>yarn.federation.enabled</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<description>
|
<description>
|
||||||
Machine list file to be loaded by the FederationSubCluster Resolver
|
Machine list file to be loaded by the FederationSubCluster Resolver
|
||||||
|
|
|
@ -105,12 +105,10 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>javax.cache</groupId>
|
<groupId>javax.cache</groupId>
|
||||||
<artifactId>cache-api</artifactId>
|
<artifactId>cache-api</artifactId>
|
||||||
<version>${jcache.version}</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.ehcache</groupId>
|
<groupId>org.ehcache</groupId>
|
||||||
<artifactId>ehcache</artifactId>
|
<artifactId>ehcache</artifactId>
|
||||||
<version>${ehcache.version}</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@Override
|
@Override
|
||||||
protected InetSocketAddress getRMAddress(YarnConfiguration conf,
|
public InetSocketAddress getRMAddress(YarnConfiguration conf,
|
||||||
Class<?> protocol) {
|
Class<?> protocol) {
|
||||||
if (protocol == ResourceTracker.class) {
|
if (protocol == ResourceTracker.class) {
|
||||||
return conf.getSocketAddr(
|
return conf.getSocketAddr(
|
||||||
|
@ -93,7 +93,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@Override
|
@Override
|
||||||
protected void checkAllowedProtocols(Class<?> protocol) {
|
public void checkAllowedProtocols(Class<?> protocol) {
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
protocol.isAssignableFrom(ResourceTracker.class),
|
protocol.isAssignableFrom(ResourceTracker.class),
|
||||||
"ResourceManager does not support this protocol");
|
"ResourceManager does not support this protocol");
|
||||||
|
|
|
@ -0,0 +1,163 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.failover;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.security.SaslRpcServer;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
|
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
|
||||||
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class that creates proxy for specified protocols when federation is
|
||||||
|
* enabled. The class creates a federation aware failover provider, i.e. the
|
||||||
|
* failover provider uses the {@code FederationStateStore} to determine the
|
||||||
|
* current active ResourceManager
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public final class FederationProxyProviderUtil {
|
||||||
|
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(FederationProxyProviderUtil.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a proxy for the specified protocol. For non-HA, this is a direct
|
||||||
|
* connection to the ResourceManager address. When HA is enabled, the proxy
|
||||||
|
* handles the failover between the ResourceManagers as well.
|
||||||
|
*
|
||||||
|
* @param configuration Configuration to generate {@link ClientRMProxy}
|
||||||
|
* @param protocol Protocol for the proxy
|
||||||
|
* @param subClusterId the unique identifier or the sub-cluster
|
||||||
|
* @param user the user on whose behalf the proxy is being created
|
||||||
|
* @param <T> Type information of the proxy
|
||||||
|
* @return Proxy to the RM
|
||||||
|
* @throws IOException on failure
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public static <T> T createRMProxy(Configuration configuration,
|
||||||
|
final Class<T> protocol, SubClusterId subClusterId,
|
||||||
|
UserGroupInformation user) throws IOException {
|
||||||
|
return createRMProxy(configuration, protocol, subClusterId, user, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a proxy for the specified protocol. For non-HA, this is a direct
|
||||||
|
* connection to the ResourceManager address. When HA is enabled, the proxy
|
||||||
|
* handles the failover between the ResourceManagers as well.
|
||||||
|
*
|
||||||
|
* @param configuration Configuration to generate {@link ClientRMProxy}
|
||||||
|
* @param protocol Protocol for the proxy
|
||||||
|
* @param subClusterId the unique identifier or the sub-cluster
|
||||||
|
* @param user the user on whose behalf the proxy is being created
|
||||||
|
* @param token the auth token to use for connection
|
||||||
|
* @param <T> Type information of the proxy
|
||||||
|
* @return Proxy to the RM
|
||||||
|
* @throws IOException on failure
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static <T> T createRMProxy(final Configuration configuration,
|
||||||
|
final Class<T> protocol, SubClusterId subClusterId,
|
||||||
|
UserGroupInformation user, final Token token) throws IOException {
|
||||||
|
try {
|
||||||
|
final YarnConfiguration conf = new YarnConfiguration(configuration);
|
||||||
|
updateConf(conf, subClusterId);
|
||||||
|
if (token != null) {
|
||||||
|
LOG.info(
|
||||||
|
"Creating RMProxy with a token: {} to subcluster: {}"
|
||||||
|
+ " for protocol: {}",
|
||||||
|
token, subClusterId, protocol.getSimpleName());
|
||||||
|
user.addToken(token);
|
||||||
|
setAuthModeInConf(conf);
|
||||||
|
} else {
|
||||||
|
LOG.info("Creating RMProxy without a token to subcluster: {}"
|
||||||
|
+ " for protocol: {}", subClusterId, protocol.getSimpleName());
|
||||||
|
}
|
||||||
|
final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
|
||||||
|
@Override
|
||||||
|
public T run() throws Exception {
|
||||||
|
return ClientRMProxy.createRMProxy(conf, protocol);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return proxyConnection;
|
||||||
|
} catch (IOException e) {
|
||||||
|
String message =
|
||||||
|
"Error while creating of RM application master service proxy for"
|
||||||
|
+ " appAttemptId: " + user;
|
||||||
|
LOG.info(message);
|
||||||
|
throw new YarnRuntimeException(message, e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new YarnRuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setAuthModeInConf(Configuration conf) {
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
SaslRpcServer.AuthMethod.TOKEN.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
// updating the conf with the refreshed RM addresses as proxy creations
|
||||||
|
// are based out of conf
|
||||||
|
private static void updateConf(Configuration conf,
|
||||||
|
SubClusterId subClusterId) {
|
||||||
|
conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId());
|
||||||
|
// In a Federation setting, we will connect to not just the local cluster RM
|
||||||
|
// but also multiple external RMs. The membership information of all the RMs
|
||||||
|
// that are currently
|
||||||
|
// participating in Federation is available in the central
|
||||||
|
// FederationStateStore.
|
||||||
|
// So we will:
|
||||||
|
// 1. obtain the RM service addresses from FederationStateStore using the
|
||||||
|
// FederationRMFailoverProxyProvider.
|
||||||
|
// 2. disable traditional HA as that depends on local configuration lookup
|
||||||
|
// for RMs using indexes.
|
||||||
|
// 3. we will enable federation failover IF traditional HA is enabled so
|
||||||
|
// that the appropriate failover RetryPolicy is initialized.
|
||||||
|
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||||
|
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
|
||||||
|
FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
|
||||||
|
if (HAUtil.isHAEnabled(conf)) {
|
||||||
|
conf.setBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// disable instantiation
|
||||||
|
private FederationProxyProviderUtil() {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,211 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.failover;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
|
||||||
|
import org.apache.hadoop.yarn.client.RMProxy;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A FailoverProxyProvider implementation that uses the
|
||||||
|
* {@code FederationStateStore} to determine the ResourceManager to connect to.
|
||||||
|
* This supports both HA and regular mode which is controlled by configuration.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public class FederationRMFailoverProxyProvider<T>
|
||||||
|
implements RMFailoverProxyProvider<T> {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(FederationRMFailoverProxyProvider.class);
|
||||||
|
|
||||||
|
private RMProxy<T> rmProxy;
|
||||||
|
private Class<T> protocol;
|
||||||
|
private T current;
|
||||||
|
private YarnConfiguration conf;
|
||||||
|
private FederationStateStoreFacade facade;
|
||||||
|
private SubClusterId subClusterId;
|
||||||
|
private Collection<Token<? extends TokenIdentifier>> originalTokens;
|
||||||
|
private boolean federationFailoverEnabled = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Configuration configuration, RMProxy<T> proxy,
|
||||||
|
Class<T> proto) {
|
||||||
|
this.rmProxy = proxy;
|
||||||
|
this.protocol = proto;
|
||||||
|
this.rmProxy.checkAllowedProtocols(this.protocol);
|
||||||
|
String clusterId =
|
||||||
|
configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
|
||||||
|
Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId");
|
||||||
|
this.subClusterId = SubClusterId.newInstance(clusterId);
|
||||||
|
this.facade = facade.getInstance();
|
||||||
|
if (configuration instanceof YarnConfiguration) {
|
||||||
|
this.conf = (YarnConfiguration) configuration;
|
||||||
|
}
|
||||||
|
federationFailoverEnabled =
|
||||||
|
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
|
||||||
|
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||||
|
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
|
||||||
|
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
|
||||||
|
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||||
|
conf.getInt(
|
||||||
|
YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
|
||||||
|
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
|
||||||
|
|
||||||
|
try {
|
||||||
|
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
|
||||||
|
originalTokens = currentUser.getTokens();
|
||||||
|
LOG.info("Initialized Federation proxy for user: {}",
|
||||||
|
currentUser.getUserName());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Could not get information of requester, ignoring for now.");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addOriginalTokens(UserGroupInformation currentUser) {
|
||||||
|
if (originalTokens == null || originalTokens.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Token<? extends TokenIdentifier> token : originalTokens) {
|
||||||
|
currentUser.addToken(token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private T getProxyInternal(boolean isFailover) {
|
||||||
|
SubClusterInfo subClusterInfo;
|
||||||
|
UserGroupInformation currentUser = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
|
||||||
|
subClusterId);
|
||||||
|
subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
|
||||||
|
// updating the conf with the refreshed RM addresses as proxy
|
||||||
|
// creations
|
||||||
|
// are based out of conf
|
||||||
|
updateRMAddress(subClusterInfo);
|
||||||
|
currentUser = UserGroupInformation.getCurrentUser();
|
||||||
|
addOriginalTokens(currentUser);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error("Exception while trying to create proxy to the ResourceManager"
|
||||||
|
+ " for SubClusterId: {}", subClusterId, e);
|
||||||
|
return null;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Could not get information of requester, ignoring for now.");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
||||||
|
LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
|
||||||
|
protocol.getSimpleName(), currentUser);
|
||||||
|
LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
|
||||||
|
subClusterId);
|
||||||
|
return rmProxy.getProxy(conf, protocol, rmAddress);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error(
|
||||||
|
"IOException while trying to create proxy to the ResourceManager"
|
||||||
|
+ " for SubClusterId: {}",
|
||||||
|
subClusterId, ioe);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateRMAddress(SubClusterInfo subClusterInfo) {
|
||||||
|
if (subClusterInfo != null) {
|
||||||
|
if (protocol == ApplicationClientProtocol.class) {
|
||||||
|
conf.set(YarnConfiguration.RM_ADDRESS,
|
||||||
|
subClusterInfo.getClientRMServiceAddress());
|
||||||
|
} else if (protocol == ApplicationMasterProtocol.class) {
|
||||||
|
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
|
subClusterInfo.getAMRMServiceAddress());
|
||||||
|
} else if (protocol == ResourceManagerAdministrationProtocol.class) {
|
||||||
|
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||||
|
subClusterInfo.getRMAdminServiceAddress());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized ProxyInfo<T> getProxy() {
|
||||||
|
if (current == null) {
|
||||||
|
current = getProxyInternal(false);
|
||||||
|
}
|
||||||
|
return new ProxyInfo<T>(current, subClusterId.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void performFailover(T currentProxy) {
|
||||||
|
closeInternal(currentProxy);
|
||||||
|
current = getProxyInternal(federationFailoverEnabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Class<T> getInterface() {
|
||||||
|
return protocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closeInternal(T currentProxy) {
|
||||||
|
if ((currentProxy != null) && (currentProxy instanceof Closeable)) {
|
||||||
|
try {
|
||||||
|
((Closeable) currentProxy).close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Exception while trying to close proxy", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
RPC.stopProxy(currentProxy);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close all the proxy objects which have been opened over the lifetime of
|
||||||
|
* this proxy provider.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void close() throws IOException {
|
||||||
|
closeInternal(current);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.federation.failover;
|
Loading…
Reference in New Issue