From 60fd202a59e58cf187c5d817ab60d7d44affc9ad Mon Sep 17 00:00:00 2001 From: Konstantin V Shvachko Date: Fri, 24 Aug 2018 18:27:30 -0700 Subject: [PATCH] HDFS-13848. Refactor NameNode failover proxy providers. Contributed by Konstantin Shvachko. (cherry picked from commit a4121c71c29d13866a605d9c0d013e5de9c147c3) --- .../io/retry/FailoverProxyProvider.java | 15 +- .../org/apache/hadoop/hdfs/DFSUtilClient.java | 2 +- .../ha/AbstractNNFailoverProxyProvider.java | 152 +++++++++++++++++- .../ha/ConfiguredFailoverProxyProvider.java | 141 ++-------------- .../namenode/ha/IPFailoverProxyProvider.java | 51 +----- 5 files changed, 178 insertions(+), 183 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java index c73e0837721..f2fa3af7d59 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java @@ -30,27 +30,30 @@ import org.apache.hadoop.classification.InterfaceStability; */ @InterfaceStability.Evolving public interface FailoverProxyProvider extends Closeable { - public static final class ProxyInfo { - public final T proxy; + static class ProxyInfo { + public T proxy; /* * The information (e.g., the IP address) of the current proxy object. It * provides information for debugging purposes. */ - public final String proxyInfo; + public String proxyInfo; public ProxyInfo(T proxy, String proxyInfo) { this.proxy = proxy; this.proxyInfo = proxyInfo; } + private String proxyName() { + return proxy != null ? proxy.getClass().getSimpleName() : "UnknownProxy"; + } + public String getString(String methodName) { - return proxy.getClass().getSimpleName() + "." + methodName - + " over " + proxyInfo; + return proxyName() + "." + methodName + " over " + proxyInfo; } @Override public String toString() { - return proxy.getClass().getSimpleName() + " over " + proxyInfo; + return proxyName() + " over " + proxyInfo; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 313b973550c..dc232835d16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -396,7 +396,7 @@ public class DFSUtilClient { * @param keys Set of keys to look for in the order of preference * @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) */ - static Map> getAddresses( + public static Map> getAddresses( Configuration conf, String defaultAddress, String... keys) { Collection nameserviceIds = getNameServiceIds(conf); return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index e0fdb3242d8..252b70dde44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -18,14 +18,68 @@ package org.apache.hadoop.hdfs.server.namenode.ha; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.HAUtilClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractNNFailoverProxyProvider implements FailoverProxyProvider { + protected static final Logger LOG = + LoggerFactory.getLogger(AbstractNNFailoverProxyProvider.class); - private AtomicBoolean fallbackToSimpleAuth; + protected Configuration conf; + protected Class xface; + protected HAProxyFactory factory; + protected UserGroupInformation ugi; + protected AtomicBoolean fallbackToSimpleAuth; + + protected AbstractNNFailoverProxyProvider() { + } + + protected AbstractNNFailoverProxyProvider(Configuration conf, URI uri, + Class xface, HAProxyFactory factory) { + this.conf = new Configuration(conf); + this.xface = xface; + this.factory = factory; + try { + this.ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + int maxRetries = this.conf.getInt( + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + maxRetries); + + int maxRetriesOnSocketTimeouts = this.conf.getInt( + HdfsClientConfigKeys + .Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + HdfsClientConfigKeys + .Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic + .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + maxRetriesOnSocketTimeouts); + } /** * Inquire whether logical HA URI is used for the implementation. If it is @@ -51,4 +105,100 @@ public abstract class AbstractNNFailoverProxyProvider implements public synchronized AtomicBoolean getFallbackToSimpleAuth() { return fallbackToSimpleAuth; } + + /** + * ProxyInfo to a NameNode. Includes its address. + */ + public static class NNProxyInfo extends ProxyInfo { + private InetSocketAddress address; + + public NNProxyInfo(InetSocketAddress address) { + super(null, address.toString()); + this.address = address; + } + + public InetSocketAddress getAddress() { + return address; + } + } + + @Override + public Class getInterface() { + return xface; + } + + /** + * Create a proxy if it has not been created yet. + */ + protected NNProxyInfo createProxyIfNeeded(NNProxyInfo pi) { + if (pi.proxy == null) { + assert pi.getAddress() != null : "Proxy address is null"; + try { + pi.proxy = factory.createProxy(conf, + pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth()); + } catch (IOException ioe) { + LOG.error("{} Failed to create RPC proxy to NameNode", + this.getClass().getSimpleName(), ioe); + throw new RuntimeException(ioe); + } + } + return pi; + } + + /** + * Get list of configured NameNode proxy addresses. + * Randomize the list if requested. + */ + protected List> getProxyAddresses(URI uri, String addressKey) { + final List> proxies = new ArrayList>(); + Map> map = + DFSUtilClient.getAddresses(conf, null, addressKey); + Map addressesInNN = map.get(uri.getHost()); + + if (addressesInNN == null || addressesInNN.size() == 0) { + throw new RuntimeException("Could not find any configured addresses " + + "for URI " + uri); + } + + Collection addressesOfNns = addressesInNN.values(); + for (InetSocketAddress address : addressesOfNns) { + proxies.add(new NNProxyInfo(address)); + } + // Randomize the list to prevent all clients pointing to the same one + boolean randomized = getRandomOrder(conf, uri); + if (randomized) { + Collections.shuffle(proxies); + } + + // The client may have a delegation token set for the logical + // URI of the cluster. Clone this token to apply to each of the + // underlying IPC addresses so that the IPC code can find it. + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); + return proxies; + } + + /** + * Check whether random order is configured for failover proxy provider + * for the namenode/nameservice. + * + * @param conf Configuration + * @param nameNodeUri The URI of namenode/nameservice + * @return random order configuration + */ + public static boolean getRandomOrder( + Configuration conf, URI nameNodeUri) { + String host = nameNodeUri.getHost(); + String configKeyWithHost = HdfsClientConfigKeys.Failover.RANDOM_ORDER + + "." + host; + + if (conf.get(configKeyWithHost) != null) { + return conf.getBoolean( + configKeyWithHost, + HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); + } + + return conf.getBoolean( + HdfsClientConfigKeys.Failover.RANDOM_ORDER, + HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 96722fcfab3..dc95538c1fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -19,23 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import java.io.Closeable; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.HAUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A FailoverProxyProvider implementation which allows one to configure @@ -46,97 +35,15 @@ import org.slf4j.LoggerFactory; public class ConfiguredFailoverProxyProvider extends AbstractNNFailoverProxyProvider { - private static final Logger LOG = - LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class); - - protected final Configuration conf; - protected final List> proxies = - new ArrayList>(); - protected final UserGroupInformation ugi; - protected final Class xface; + protected final List> proxies; private int currentProxyIndex = 0; - protected final HAProxyFactory factory; public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, Class xface, HAProxyFactory factory) { - this.xface = xface; - this.conf = new Configuration(conf); - int maxRetries = this.conf.getInt( - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, - maxRetries); - - int maxRetriesOnSocketTimeouts = this.conf.getInt( - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic - .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - maxRetriesOnSocketTimeouts); - - try { - ugi = UserGroupInformation.getCurrentUser(); - - Map> map = - DFSUtilClient.getHaNnRpcAddresses(conf); - Map addressesInNN = map.get(uri.getHost()); - - if (addressesInNN == null || addressesInNN.size() == 0) { - throw new RuntimeException("Could not find any configured addresses " + - "for URI " + uri); - } - - Collection addressesOfNns = addressesInNN.values(); - for (InetSocketAddress address : addressesOfNns) { - proxies.add(new AddressRpcProxyPair(address)); - } - // Randomize the list to prevent all clients pointing to the same one - boolean randomized = getRandomOrder(conf, uri); - if (randomized) { - Collections.shuffle(proxies); - } - - // The client may have a delegation token set for the logical - // URI of the cluster. Clone this token to apply to each of the - // underlying IPC addresses so that the IPC code can find it. - HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); - this.factory = factory; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Check whether random order is configured for failover proxy provider - * for the namenode/nameservice. - * - * @param conf Configuration - * @param nameNodeUri The URI of namenode/nameservice - * @return random order configuration - */ - private static boolean getRandomOrder( - Configuration conf, URI nameNodeUri) { - String host = nameNodeUri.getHost(); - String configKeyWithHost = HdfsClientConfigKeys.Failover.RANDOM_ORDER - + "." + host; - - if (conf.get(configKeyWithHost) != null) { - return conf.getBoolean( - configKeyWithHost, - HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); - } - - return conf.getBoolean( - HdfsClientConfigKeys.Failover.RANDOM_ORDER, - HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); - } - - @Override - public Class getInterface() { - return xface; + super(conf, uri, xface, factory); + this.proxies = getProxyAddresses(uri, + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); } /** @@ -144,21 +51,8 @@ public class ConfiguredFailoverProxyProvider extends */ @Override public synchronized ProxyInfo getProxy() { - AddressRpcProxyPair current = proxies.get(currentProxyIndex); - return getProxy(current); - } - - protected ProxyInfo getProxy(AddressRpcProxyPair current) { - if (current.namenode == null) { - try { - current.namenode = factory.createProxy(conf, - current.address, xface, ugi, false, getFallbackToSimpleAuth()); - } catch (IOException e) { - LOG.error("Failed to create RPC proxy to NameNode", e); - throw new RuntimeException(e); - } - } - return new ProxyInfo(current.namenode, current.address.toString()); + NNProxyInfo current = proxies.get(currentProxyIndex); + return createProxyIfNeeded(current); } @Override @@ -170,31 +64,18 @@ public class ConfiguredFailoverProxyProvider extends currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); } - /** - * A little pair object to store the address and connected RPC proxy object to - * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null. - */ - protected static class AddressRpcProxyPair { - public final InetSocketAddress address; - public T namenode; - - public AddressRpcProxyPair(InetSocketAddress address) { - this.address = address; - } - } - /** * Close all the proxy objects which have been opened over the lifetime of * this proxy provider. */ @Override public synchronized void close() throws IOException { - for (AddressRpcProxyPair proxy : proxies) { - if (proxy.namenode != null) { - if (proxy.namenode instanceof Closeable) { - ((Closeable)proxy.namenode).close(); + for (ProxyInfo proxy : proxies) { + if (proxy.proxy != null) { + if (proxy.proxy instanceof Closeable) { + ((Closeable)proxy.proxy).close(); } else { - RPC.stopProxy(proxy.namenode); + RPC.stopProxy(proxy.proxy); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java index ed250a0f42e..e70374047a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java @@ -19,15 +19,11 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import java.io.Closeable; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.URI; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; /** * A NNFailoverProxyProvider implementation which works on IP failover setup. @@ -47,53 +43,18 @@ import org.apache.hadoop.security.UserGroupInformation; */ public class IPFailoverProxyProvider extends AbstractNNFailoverProxyProvider { - private final Configuration conf; - private final Class xface; - private final URI nameNodeUri; - private final HAProxyFactory factory; - private ProxyInfo nnProxyInfo = null; + private final NNProxyInfo nnProxyInfo; public IPFailoverProxyProvider(Configuration conf, URI uri, Class xface, HAProxyFactory factory) { - this.xface = xface; - this.nameNodeUri = uri; - this.factory = factory; - - this.conf = new Configuration(conf); - int maxRetries = this.conf.getInt( - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, - maxRetries); - - int maxRetriesOnSocketTimeouts = this.conf.getInt( - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - maxRetriesOnSocketTimeouts); + super(conf, uri, xface, factory); + this.nnProxyInfo = new NNProxyInfo(DFSUtilClient.getNNAddress(uri)); } @Override - public Class getInterface() { - return xface; - } - - @Override - public synchronized ProxyInfo getProxy() { + public synchronized NNProxyInfo getProxy() { // Create a non-ha proxy if not already created. - if (nnProxyInfo == null) { - try { - // Create a proxy that is not wrapped in RetryProxy - InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); - nnProxyInfo = new ProxyInfo(factory.createProxy(conf, nnAddr, xface, - UserGroupInformation.getCurrentUser(), false), nnAddr.toString()); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - return nnProxyInfo; + return createProxyIfNeeded(nnProxyInfo); } /** Nothing to do for IP failover */ @@ -106,7 +67,7 @@ public class IPFailoverProxyProvider extends */ @Override public synchronized void close() throws IOException { - if (nnProxyInfo == null) { + if (nnProxyInfo.proxy == null) { return; } if (nnProxyInfo.proxy instanceof Closeable) {