HDFS-13848. Refactor NameNode failover proxy providers. Contributed by Konstantin Shvachko.

(cherry picked from commit a4121c71c2)
This commit is contained in:
Konstantin V Shvachko 2018-08-24 18:27:30 -07:00
parent e4282c077b
commit 60fd202a59
5 changed files with 178 additions and 183 deletions

View File

@ -30,27 +30,30 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@InterfaceStability.Evolving
public interface FailoverProxyProvider<T> extends Closeable {
public static final class ProxyInfo<T> {
public final T proxy;
static class ProxyInfo<T> {
public T proxy;
/*
* The information (e.g., the IP address) of the current proxy object. It
* provides information for debugging purposes.
*/
public final String proxyInfo;
public String proxyInfo;
public ProxyInfo(T proxy, String proxyInfo) {
this.proxy = proxy;
this.proxyInfo = proxyInfo;
}
private String proxyName() {
return proxy != null ? proxy.getClass().getSimpleName() : "UnknownProxy";
}
public String getString(String methodName) {
return proxy.getClass().getSimpleName() + "." + methodName
+ " over " + proxyInfo;
return proxyName() + "." + methodName + " over " + proxyInfo;
}
@Override
public String toString() {
return proxy.getClass().getSimpleName() + " over " + proxyInfo;
return proxyName() + " over " + proxyInfo;
}
}

View File

@ -396,7 +396,7 @@ public class DFSUtilClient {
* @param keys Set of keys to look for in the order of preference
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
static Map<String, Map<String, InetSocketAddress>> getAddresses(
public static Map<String, Map<String, InetSocketAddress>> getAddresses(
Configuration conf, String defaultAddress, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);

View File

@ -18,14 +18,68 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractNNFailoverProxyProvider<T> implements
FailoverProxyProvider <T> {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractNNFailoverProxyProvider.class);
private AtomicBoolean fallbackToSimpleAuth;
protected Configuration conf;
protected Class<T> xface;
protected HAProxyFactory<T> factory;
protected UserGroupInformation ugi;
protected AtomicBoolean fallbackToSimpleAuth;
protected AbstractNNFailoverProxyProvider() {
}
protected AbstractNNFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory) {
this.conf = new Configuration(conf);
this.xface = xface;
this.factory = factory;
try {
this.ugi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new RuntimeException(e);
}
int maxRetries = this.conf.getInt(
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
maxRetries);
int maxRetriesOnSocketTimeouts = this.conf.getInt(
HdfsClientConfigKeys
.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
HdfsClientConfigKeys
.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
this.conf.setInt(
CommonConfigurationKeysPublic
.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
maxRetriesOnSocketTimeouts);
}
/**
* Inquire whether logical HA URI is used for the implementation. If it is
@ -51,4 +105,100 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
public synchronized AtomicBoolean getFallbackToSimpleAuth() {
return fallbackToSimpleAuth;
}
/**
* ProxyInfo to a NameNode. Includes its address.
*/
public static class NNProxyInfo<T> extends ProxyInfo<T> {
private InetSocketAddress address;
public NNProxyInfo(InetSocketAddress address) {
super(null, address.toString());
this.address = address;
}
public InetSocketAddress getAddress() {
return address;
}
}
@Override
public Class<T> getInterface() {
return xface;
}
/**
* Create a proxy if it has not been created yet.
*/
protected NNProxyInfo<T> createProxyIfNeeded(NNProxyInfo<T> pi) {
if (pi.proxy == null) {
assert pi.getAddress() != null : "Proxy address is null";
try {
pi.proxy = factory.createProxy(conf,
pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to NameNode",
this.getClass().getSimpleName(), ioe);
throw new RuntimeException(ioe);
}
}
return pi;
}
/**
* Get list of configured NameNode proxy addresses.
* Randomize the list if requested.
*/
protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey) {
final List<NNProxyInfo<T>> proxies = new ArrayList<NNProxyInfo<T>>();
Map<String, Map<String, InetSocketAddress>> map =
DFSUtilClient.getAddresses(conf, null, addressKey);
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
if (addressesInNN == null || addressesInNN.size() == 0) {
throw new RuntimeException("Could not find any configured addresses " +
"for URI " + uri);
}
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
for (InetSocketAddress address : addressesOfNns) {
proxies.add(new NNProxyInfo<T>(address));
}
// Randomize the list to prevent all clients pointing to the same one
boolean randomized = getRandomOrder(conf, uri);
if (randomized) {
Collections.shuffle(proxies);
}
// The client may have a delegation token set for the logical
// URI of the cluster. Clone this token to apply to each of the
// underlying IPC addresses so that the IPC code can find it.
HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
return proxies;
}
/**
* Check whether random order is configured for failover proxy provider
* for the namenode/nameservice.
*
* @param conf Configuration
* @param nameNodeUri The URI of namenode/nameservice
* @return random order configuration
*/
public static boolean getRandomOrder(
Configuration conf, URI nameNodeUri) {
String host = nameNodeUri.getHost();
String configKeyWithHost = HdfsClientConfigKeys.Failover.RANDOM_ORDER
+ "." + host;
if (conf.get(configKeyWithHost) != null) {
return conf.getBoolean(
configKeyWithHost,
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
}
return conf.getBoolean(
HdfsClientConfigKeys.Failover.RANDOM_ORDER,
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
}
}

View File

@ -19,23 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A FailoverProxyProvider implementation which allows one to configure
@ -46,97 +35,15 @@ import org.slf4j.LoggerFactory;
public class ConfiguredFailoverProxyProvider<T> extends
AbstractNNFailoverProxyProvider<T> {
private static final Logger LOG =
LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class);
protected final Configuration conf;
protected final List<AddressRpcProxyPair<T>> proxies =
new ArrayList<AddressRpcProxyPair<T>>();
protected final UserGroupInformation ugi;
protected final Class<T> xface;
protected final List<NNProxyInfo<T>> proxies;
private int currentProxyIndex = 0;
protected final HAProxyFactory<T> factory;
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory) {
this.xface = xface;
this.conf = new Configuration(conf);
int maxRetries = this.conf.getInt(
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
maxRetries);
int maxRetriesOnSocketTimeouts = this.conf.getInt(
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
this.conf.setInt(
CommonConfigurationKeysPublic
.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
maxRetriesOnSocketTimeouts);
try {
ugi = UserGroupInformation.getCurrentUser();
Map<String, Map<String, InetSocketAddress>> map =
DFSUtilClient.getHaNnRpcAddresses(conf);
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
if (addressesInNN == null || addressesInNN.size() == 0) {
throw new RuntimeException("Could not find any configured addresses " +
"for URI " + uri);
}
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
for (InetSocketAddress address : addressesOfNns) {
proxies.add(new AddressRpcProxyPair<T>(address));
}
// Randomize the list to prevent all clients pointing to the same one
boolean randomized = getRandomOrder(conf, uri);
if (randomized) {
Collections.shuffle(proxies);
}
// The client may have a delegation token set for the logical
// URI of the cluster. Clone this token to apply to each of the
// underlying IPC addresses so that the IPC code can find it.
HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
this.factory = factory;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Check whether random order is configured for failover proxy provider
* for the namenode/nameservice.
*
* @param conf Configuration
* @param nameNodeUri The URI of namenode/nameservice
* @return random order configuration
*/
private static boolean getRandomOrder(
Configuration conf, URI nameNodeUri) {
String host = nameNodeUri.getHost();
String configKeyWithHost = HdfsClientConfigKeys.Failover.RANDOM_ORDER
+ "." + host;
if (conf.get(configKeyWithHost) != null) {
return conf.getBoolean(
configKeyWithHost,
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
}
return conf.getBoolean(
HdfsClientConfigKeys.Failover.RANDOM_ORDER,
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
}
@Override
public Class<T> getInterface() {
return xface;
super(conf, uri, xface, factory);
this.proxies = getProxyAddresses(uri,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/**
@ -144,21 +51,8 @@ public class ConfiguredFailoverProxyProvider<T> extends
*/
@Override
public synchronized ProxyInfo<T> getProxy() {
AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
return getProxy(current);
}
protected ProxyInfo<T> getProxy(AddressRpcProxyPair<T> current) {
if (current.namenode == null) {
try {
current.namenode = factory.createProxy(conf,
current.address, xface, ugi, false, getFallbackToSimpleAuth());
} catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e);
}
}
return new ProxyInfo<T>(current.namenode, current.address.toString());
NNProxyInfo<T> current = proxies.get(currentProxyIndex);
return createProxyIfNeeded(current);
}
@Override
@ -170,31 +64,18 @@ public class ConfiguredFailoverProxyProvider<T> extends
currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
}
/**
* A little pair object to store the address and connected RPC proxy object to
* an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
*/
protected static class AddressRpcProxyPair<T> {
public final InetSocketAddress address;
public T namenode;
public AddressRpcProxyPair(InetSocketAddress address) {
this.address = address;
}
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* this proxy provider.
*/
@Override
public synchronized void close() throws IOException {
for (AddressRpcProxyPair<T> proxy : proxies) {
if (proxy.namenode != null) {
if (proxy.namenode instanceof Closeable) {
((Closeable)proxy.namenode).close();
for (ProxyInfo<T> proxy : proxies) {
if (proxy.proxy != null) {
if (proxy.proxy instanceof Closeable) {
((Closeable)proxy.proxy).close();
} else {
RPC.stopProxy(proxy.namenode);
RPC.stopProxy(proxy.proxy);
}
}
}

View File

@ -19,15 +19,11 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
/**
* A NNFailoverProxyProvider implementation which works on IP failover setup.
@ -47,53 +43,18 @@ import org.apache.hadoop.security.UserGroupInformation;
*/
public class IPFailoverProxyProvider<T> extends
AbstractNNFailoverProxyProvider<T> {
private final Configuration conf;
private final Class<T> xface;
private final URI nameNodeUri;
private final HAProxyFactory<T> factory;
private ProxyInfo<T> nnProxyInfo = null;
private final NNProxyInfo<T> nnProxyInfo;
public IPFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory) {
this.xface = xface;
this.nameNodeUri = uri;
this.factory = factory;
this.conf = new Configuration(conf);
int maxRetries = this.conf.getInt(
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
maxRetries);
int maxRetriesOnSocketTimeouts = this.conf.getInt(
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
maxRetriesOnSocketTimeouts);
super(conf, uri, xface, factory);
this.nnProxyInfo = new NNProxyInfo<T>(DFSUtilClient.getNNAddress(uri));
}
@Override
public Class<T> getInterface() {
return xface;
}
@Override
public synchronized ProxyInfo<T> getProxy() {
public synchronized NNProxyInfo<T> getProxy() {
// Create a non-ha proxy if not already created.
if (nnProxyInfo == null) {
try {
// Create a proxy that is not wrapped in RetryProxy
InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
nnProxyInfo = new ProxyInfo<T>(factory.createProxy(conf, nnAddr, xface,
UserGroupInformation.getCurrentUser(), false), nnAddr.toString());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
return nnProxyInfo;
return createProxyIfNeeded(nnProxyInfo);
}
/** Nothing to do for IP failover */
@ -106,7 +67,7 @@ public class IPFailoverProxyProvider<T> extends
*/
@Override
public synchronized void close() throws IOException {
if (nnProxyInfo == null) {
if (nnProxyInfo.proxy == null) {
return;
}
if (nnProxyInfo.proxy instanceof Closeable) {