HADOOP-16938. Make non-HA proxy providers pluggable

This commit is contained in:
RogPodge 2020-03-25 08:06:58 -07:00 committed by GitHub
parent cdb2107066
commit 2d294bd575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 139 additions and 10 deletions

View File

@ -958,6 +958,10 @@ public static boolean isAclEnabled(Configuration conf) {
CLIENT_FAILOVER_PREFIX + "proxy-provider"; CLIENT_FAILOVER_PREFIX + "proxy-provider";
public static final String DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER = public static final String DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER =
"org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider"; "org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider";
public static final String CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER =
CLIENT_FAILOVER_PREFIX + "no-ha-proxy-provider";
public static final String DEFAULT_CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER =
"org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider";
public static final String CLIENT_FAILOVER_MAX_ATTEMPTS = public static final String CLIENT_FAILOVER_MAX_ATTEMPTS =
CLIENT_FAILOVER_PREFIX + "max-attempts"; CLIENT_FAILOVER_PREFIX + "max-attempts";

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* An implementation of {@link RMFailoverProxyProvider} which does nothing in
* the event of failover, and always returns the same proxy object.
* This is the default non-HA RM Failover proxy provider. It is used to replace
* {@link DefaultFailoverProxyProvider} which was used as Yarn default non-HA.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class DefaultNoHARMFailoverProxyProvider<T>
implements RMFailoverProxyProvider<T> {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultNoHARMFailoverProxyProvider.class);
protected T proxy;
protected Class<T> protocol;
/**
* Initialize internal data structures, invoked right after instantiation.
*
* @param conf Configuration to use
* @param proxy The {@link RMProxy} instance to use
* @param protocol The communication protocol to use
*/
@Override
public void init(Configuration conf, RMProxy<T> proxy,
Class<T> protocol) {
this.protocol = protocol;
try {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress =
proxy.getRMAddress(yarnConf, protocol);
LOG.info("Connecting to ResourceManager at {}", rmAddress);
this.proxy = proxy.getProxy(yarnConf, protocol, rmAddress);
} catch (IOException ioe) {
LOG.error("Unable to create proxy to the ResourceManager ", ioe);
}
}
@Override
public Class<T> getInterface() {
return protocol;
}
@Override
public ProxyInfo<T> getProxy() {
return new ProxyInfo<T>(proxy, null);
}
/**
* PerformFailover does nothing in this class.
* @param currentProxy
*/
@Override
public void performFailover(T currentProxy) {
// Nothing to do.
}
/**
* Close the current proxy.
* @throws IOException
*/
@Override
public void close() throws IOException {
RPC.stopProxy(proxy);
}
}

View File

@ -56,8 +56,7 @@
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class RMProxy<T> { public class RMProxy<T> {
private static final Logger LOG = private static final Logger LOG = LoggerFactory.getLogger(RMProxy.class);
LoggerFactory.getLogger(RMProxy.class);
private UserGroupInformation user; private UserGroupInformation user;
protected RMProxy() { protected RMProxy() {
@ -125,16 +124,13 @@ protected static <T> T createRMProxy(final Configuration configuration,
private static <T> T newProxyInstance(final YarnConfiguration conf, private static <T> T newProxyInstance(final YarnConfiguration conf,
final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy) final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy)
throws IOException{ throws IOException{
RMFailoverProxyProvider<T> provider;
if (HAUtil.isHAEnabled(conf) || HAUtil.isFederationEnabled(conf)) { if (HAUtil.isHAEnabled(conf) || HAUtil.isFederationEnabled(conf)) {
RMFailoverProxyProvider<T> provider = provider = instance.createRMFailoverProxyProvider(conf, protocol);
instance.createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy);
} else { } else {
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol); provider = instance.createNonHaRMFailoverProxyProvider(conf, protocol);
LOG.info("Connecting to ResourceManager at " + rmAddress);
T proxy = instance.getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
} }
return (T) RetryProxy.create(protocol, provider, retryPolicy);
} }
/** /**
@ -154,6 +150,28 @@ public T run() {
}); });
} }
/**
* Helper method to create non-HA RMFailoverProxyProvider.
*/
private <T> RMFailoverProxyProvider<T> createNonHaRMFailoverProxyProvider(
Configuration conf, Class<T> protocol) {
Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
try {
defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
Class.forName(
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER);
} catch (Exception e) {
throw new YarnRuntimeException("Invalid default failover provider class" +
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER, e);
}
RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER,
defaultProviderClass, RMFailoverProxyProvider.class), conf);
provider.init(conf, (RMProxy<T>) this, protocol);
return provider;
}
/** /**
* Helper method to create FailoverProxyProvider. * Helper method to create FailoverProxyProvider.
*/ */

View File

@ -728,6 +728,14 @@
<value>org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider</value> <value>org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider</value>
</property> </property>
<property>
<description>When HA is not enabled, the class to be used by Clients, AMs and
NMs to retry connecting to the Active RM. It should extend
org.apache.hadoop.yarn.client.RMFailoverProxyProvider</description>
<name>yarn.client.failover-no-ha-proxy-provider</name>
<value>org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider</value>
</property>
<property> <property>
<description>When HA is enabled, the max number of times <description>When HA is enabled, the max number of times
FailoverProxyProvider should attempt failover. When set, FailoverProxyProvider should attempt failover. When set,

View File

@ -41,7 +41,7 @@ The RMs have an option to embed the Zookeeper-based ActiveStandbyElector to deci
#### Client, ApplicationMaster and NodeManager on RM failover #### Client, ApplicationMaster and NodeManager on RM failover
When there are multiple RMs, the configuration (yarn-site.xml) used by clients and nodes is expected to list all the RMs. Clients, ApplicationMasters (AMs) and NodeManagers (NMs) try connecting to the RMs in a round-robin fashion until they hit the Active RM. If the Active goes down, they resume the round-robin polling until they hit the "new" Active. This default retry logic is implemented as `org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider`. You can override the logic by implementing `org.apache.hadoop.yarn.client.RMFailoverProxyProvider` and setting the value of `yarn.client.failover-proxy-provider` to the class name. When there are multiple RMs, the configuration (yarn-site.xml) used by clients and nodes is expected to list all the RMs. Clients, ApplicationMasters (AMs) and NodeManagers (NMs) try connecting to the RMs in a round-robin fashion until they hit the Active RM. If the Active goes down, they resume the round-robin polling until they hit the "new" Active. This default retry logic is implemented as `org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider`. You can override the logic by implementing `org.apache.hadoop.yarn.client.RMFailoverProxyProvider` and setting the value of `yarn.client.failover-proxy-provider` to the class name. When running in non-ha mode, set the value of `yarn.client.failover-no-ha-proxy-provider` instead
### Recovering previous active-RM's state ### Recovering previous active-RM's state
@ -71,6 +71,7 @@ Most of the failover functionality is tunable using various configuration proper
| `yarn.resourcemanager.ha.automatic-failover.embedded` | Use embedded leader-elector to pick the Active RM, when automatic failover is enabled. By default, it is enabled only when HA is enabled. | | `yarn.resourcemanager.ha.automatic-failover.embedded` | Use embedded leader-elector to pick the Active RM, when automatic failover is enabled. By default, it is enabled only when HA is enabled. |
| `yarn.resourcemanager.cluster-id` | Identifies the cluster. Used by the elector to ensure an RM doesn't take over as Active for another cluster. | | `yarn.resourcemanager.cluster-id` | Identifies the cluster. Used by the elector to ensure an RM doesn't take over as Active for another cluster. |
| `yarn.client.failover-proxy-provider` | The class to be used by Clients, AMs and NMs to failover to the Active RM. | | `yarn.client.failover-proxy-provider` | The class to be used by Clients, AMs and NMs to failover to the Active RM. |
| `yarn.client.failover-no-ha-proxy-provider` | The class to be used by Clients, AMs and NMs to failover to the Active RM, when not running in HA mode |
| `yarn.client.failover-max-attempts` | The max number of times FailoverProxyProvider should attempt failover. | | `yarn.client.failover-max-attempts` | The max number of times FailoverProxyProvider should attempt failover. |
| `yarn.client.failover-sleep-base-ms` | The sleep base (in milliseconds) to be used for calculating the exponential delay between failovers. | | `yarn.client.failover-sleep-base-ms` | The sleep base (in milliseconds) to be used for calculating the exponential delay between failovers. |
| `yarn.client.failover-sleep-max-ms` | The maximum sleep time (in milliseconds) between failovers. | | `yarn.client.failover-sleep-max-ms` | The maximum sleep time (in milliseconds) between failovers. |