HDFS-14017. [SBN read] ObserverReadProxyProviderWithIPFailover should work with HA configuration. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2018-11-16 17:30:29 -08:00
parent 683daedc1f
commit 96cdd13de5
2 changed files with 89 additions and 11 deletions

View File

@ -181,6 +181,9 @@ public interface HdfsClientConfigKeys {
String DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES = String DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES =
"dfs.namenode.snapshot.capture.openfiles"; "dfs.namenode.snapshot.capture.openfiles";
boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false; boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false;
String DFS_CLIENT_FAILOVER_IPFAILOVER_VIRTUAL_ADDRESS =
Failover.PREFIX + "ipfailover.virtual-address";
/** /**
* These are deprecated config keys to client code. * These are deprecated config keys to client code.

View File

@ -17,24 +17,99 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FAILOVER_IPFAILOVER_VIRTUAL_ADDRESS;
/** /**
* ObserverReadProxyProvider with IPFailoverProxyProvider * Extends {@link ObserverReadProxyProvider} to support NameNode IP failover.
* as the failover method. *
* For Observer reads a client needs to know physical addresses of all
* NameNodes, so that it could switch between active and observer nodes
* for write and read requests.
*
* Traditional {@link IPFailoverProxyProvider} works with a virtual
* address of the NameNode. If active NameNode fails the virtual address
* is assigned to the standby NameNode, and IPFailoverProxyProvider, which
* keeps talking to the same virtual address is in fact now connects to
* the new physical server.
*
* To combine these behaviors ObserverReadProxyProviderWithIPFailover
* should both
* <ol>
* <li> Maintain all physical addresses of NameNodes in order to allow
* observer reads, and
* <li> Should rely on the virtual address of the NameNode in order to
* perform failover by assuming that the virtual address always points
* to the active NameNode.
* </ol>
*
* An example of a configuration to leverage
* ObserverReadProxyProviderWithIPFailover
* should include the following values:
* <pre>{@code
* fs.defaultFS = hdfs://mycluster
* dfs.nameservices = mycluster
* dfs.ha.namenodes.mycluster = ha1,ha2
* dfs.namenode.rpc-address.mycluster.ha1 = nn01-ha1.com:8020
* dfs.namenode.rpc-address.mycluster.ha2 = nn01-ha2.com:8020
* dfs.client.failover.ipfailover.virtual-address.mycluster = nn01.com:8020
* dfs.client.failover.proxy.provider.mycluster =
* org.apache...ObserverReadProxyProviderWithIPFailover
* }</pre>
* Here {@code nn01.com:8020} is the virtual address of the active NameNode,
* while {@code nn01-ha1.com:8020} and {@code nn01-ha2.com:8020}
* are the physically addresses the two NameNodes.
*
* With this configuration, client will use
* ObserverReadProxyProviderWithIPFailover, which creates proxies for both
* nn01-ha1 and nn01-ha2, used for read/write RPC calls, but for the failover,
* it relies on the virtual address nn01.com
*/ */
public class
ObserverReadProxyProviderWithIPFailover<T extends ClientProtocol> public class ObserverReadProxyProviderWithIPFailover<T extends ClientProtocol>
extends ObserverReadProxyProvider<T> { extends ObserverReadProxyProvider<T> {
private static final Logger LOG = LoggerFactory.getLogger(
ObserverReadProxyProviderWithIPFailover.class);
/**
* By default ObserverReadProxyProviderWithIPFailover
* uses {@link IPFailoverProxyProvider} for failover.
*/
public ObserverReadProxyProviderWithIPFailover(
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
this(conf, uri, xface, factory,
new IPFailoverProxyProvider<>(conf,
getFailoverVirtualIP(conf, uri.getHost()), xface, factory));
}
@Override
public boolean useLogicalURI() {
return true;
}
public ObserverReadProxyProviderWithIPFailover( public ObserverReadProxyProviderWithIPFailover(
Configuration conf, URI uri, Class<T> xface, Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
HAProxyFactory<T> factory) throws IOException { AbstractNNFailoverProxyProvider<T> failoverProxy) {
super(conf, uri, xface, factory, super(conf, uri, xface, factory, failoverProxy);
new IPFailoverProxyProvider<T>(conf, uri, xface,factory));
} }
}
private static URI getFailoverVirtualIP(
Configuration conf, String nameServiceID) {
String configKey = DFS_CLIENT_FAILOVER_IPFAILOVER_VIRTUAL_ADDRESS
+ "." + nameServiceID;
String virtualIP = conf.get(configKey);
LOG.info("Name service ID {} will use virtual IP {} for failover",
nameServiceID, virtualIP);
if (virtualIP == null) {
throw new IllegalArgumentException("Virtual IP for failover not found,"
+ "misconfigured " + configKey + "?");
}
return URI.create(virtualIP);
}
}