diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index f2cec314ffe..4587d3530e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -184,6 +184,9 @@ public interface HdfsClientConfigKeys { "dfs.namenode.snapshot.capture.openfiles"; boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false; + String DFS_CLIENT_FAILOVER_IPFAILOVER_VIRTUAL_ADDRESS = + Failover.PREFIX + "ipfailover.virtual-address"; + /** * These are deprecated config keys to client code. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java index 1dbd02cb381..751bc3b5c3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java @@ -17,24 +17,99 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FAILOVER_IPFAILOVER_VIRTUAL_ADDRESS; /** - * ObserverReadProxyProvider with IPFailoverProxyProvider - * as the failover method. + * Extends {@link ObserverReadProxyProvider} to support NameNode IP failover. + * + * For Observer reads a client needs to know physical addresses of all + * NameNodes, so that it could switch between active and observer nodes + * for write and read requests. + * + * Traditional {@link IPFailoverProxyProvider} works with a virtual + * address of the NameNode. If active NameNode fails the virtual address + * is assigned to the standby NameNode, and IPFailoverProxyProvider, which + * keeps talking to the same virtual address is in fact now connects to + * the new physical server. + * + * To combine these behaviors ObserverReadProxyProviderWithIPFailover + * should both + *
    + *
  1. Maintain all physical addresses of NameNodes in order to allow + * observer reads, and + *
  2. Should rely on the virtual address of the NameNode in order to + * perform failover by assuming that the virtual address always points + * to the active NameNode. + *
+ * + * An example of a configuration to leverage + * ObserverReadProxyProviderWithIPFailover + * should include the following values: + *
{@code
+ * fs.defaultFS = hdfs://mycluster
+ * dfs.nameservices = mycluster
+ * dfs.ha.namenodes.mycluster = ha1,ha2
+ * dfs.namenode.rpc-address.mycluster.ha1 = nn01-ha1.com:8020
+ * dfs.namenode.rpc-address.mycluster.ha2 = nn01-ha2.com:8020
+ * dfs.client.failover.ipfailover.virtual-address.mycluster = nn01.com:8020
+ * dfs.client.failover.proxy.provider.mycluster =
+ *     org.apache...ObserverReadProxyProviderWithIPFailover
+ * }
+ * Here {@code nn01.com:8020} is the virtual address of the active NameNode, + * while {@code nn01-ha1.com:8020} and {@code nn01-ha2.com:8020} + * are the physically addresses the two NameNodes. + * + * With this configuration, client will use + * ObserverReadProxyProviderWithIPFailover, which creates proxies for both + * nn01-ha1 and nn01-ha2, used for read/write RPC calls, but for the failover, + * it relies on the virtual address nn01.com */ -public class -ObserverReadProxyProviderWithIPFailover -extends ObserverReadProxyProvider { + +public class ObserverReadProxyProviderWithIPFailover + extends ObserverReadProxyProvider { + private static final Logger LOG = LoggerFactory.getLogger( + ObserverReadProxyProviderWithIPFailover.class); + + /** + * By default ObserverReadProxyProviderWithIPFailover + * uses {@link IPFailoverProxyProvider} for failover. + */ + public ObserverReadProxyProviderWithIPFailover( + Configuration conf, URI uri, Class xface, HAProxyFactory factory) { + this(conf, uri, xface, factory, + new IPFailoverProxyProvider<>(conf, + getFailoverVirtualIP(conf, uri.getHost()), xface, factory)); + } + + @Override + public boolean useLogicalURI() { + return true; + } public ObserverReadProxyProviderWithIPFailover( - Configuration conf, URI uri, Class xface, - HAProxyFactory factory) throws IOException { - super(conf, uri, xface, factory, - new IPFailoverProxyProvider(conf, uri, xface,factory)); + Configuration conf, URI uri, Class xface, HAProxyFactory factory, + AbstractNNFailoverProxyProvider failoverProxy) { + super(conf, uri, xface, factory, failoverProxy); } -} \ No newline at end of file + + private static URI getFailoverVirtualIP( + Configuration conf, String nameServiceID) { + String configKey = DFS_CLIENT_FAILOVER_IPFAILOVER_VIRTUAL_ADDRESS + + "." + nameServiceID; + String virtualIP = conf.get(configKey); + LOG.info("Name service ID {} will use virtual IP {} for failover", + nameServiceID, virtualIP); + if (virtualIP == null) { + throw new IllegalArgumentException("Virtual IP for failover not found," + + "misconfigured " + configKey + "?"); + } + return URI.create(virtualIP); + } +}