HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2018-10-23 14:53:45 -07:00
parent 14265b8f03
commit 3835a6c4e8
11 changed files with 468 additions and 16 deletions

View File

@ -379,6 +379,24 @@ public abstract class Server {
return (call != null ) ? call.getHostInetAddress() : null; return (call != null ) ? call.getHostInetAddress() : null;
} }
/**
* Returns the SASL qop for the current call, if the current call is
* set, and the SASL negotiation is done. Otherwise return null. Note
* that CurCall is thread local object. So in fact, different handler
* threads will process different CurCall object.
*
* Also, only return for RPC calls, not supported for other protocols.
* @return the QOP of the current connection.
*/
public static String getEstablishedQOP() {
Call call = CurCall.get();
if (call == null || !(call instanceof RpcCall)) {
return null;
}
RpcCall rpcCall = (RpcCall)call;
return rpcCall.connection.getEstablishedQOP();
}
/** /**
* Returns the clientId from the current RPC request * Returns the clientId from the current RPC request
*/ */
@ -458,6 +476,10 @@ public abstract class Server {
// maintains the set of client connections and handles idle timeouts // maintains the set of client connections and handles idle timeouts
private ConnectionManager connectionManager; private ConnectionManager connectionManager;
private Listener listener = null; private Listener listener = null;
// Auxiliary listeners maintained as in a map, to allow
// arbitrary number of of auxiliary listeners. A map from
// the port to the listener binding to it.
private Map<Integer, Listener> auxiliaryListenerMap;
private Responder responder = null; private Responder responder = null;
private Handler[] handlers = null; private Handler[] handlers = null;
@ -1148,11 +1170,12 @@ public abstract class Server {
private Reader[] readers = null; private Reader[] readers = null;
private int currentReader = 0; private int currentReader = 0;
private InetSocketAddress address; //the address we bind at private InetSocketAddress address; //the address we bind at
private int listenPort; //the port we bind at
private int backlogLength = conf.getInt( private int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
public Listener() throws IOException { Listener(int port) throws IOException {
address = new InetSocketAddress(bindAddress, port); address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode // Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open(); acceptChannel = ServerSocketChannel.open();
@ -1160,7 +1183,10 @@ public abstract class Server {
// Bind the server socket to the local host and port // Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port //Could be an ephemeral port
this.listenPort = acceptChannel.socket().getLocalPort();
Thread.currentThread().setName("Listener at " +
bindAddress + "/" + this.listenPort);
// create a selector; // create a selector;
selector= Selector.open(); selector= Selector.open();
readers = new Reader[readThreads]; readers = new Reader[readThreads];
@ -1343,7 +1369,7 @@ public abstract class Server {
channel.socket().setKeepAlive(true); channel.socket().setKeepAlive(true);
Reader reader = getReader(); Reader reader = getReader();
Connection c = connectionManager.register(channel); Connection c = connectionManager.register(channel, this.listenPort);
// If the connectionManager can't take it, close the connection. // If the connectionManager can't take it, close the connection.
if (c == null) { if (c == null) {
if (channel.isOpen()) { if (channel.isOpen()) {
@ -1765,6 +1791,7 @@ public abstract class Server {
private ByteBuffer unwrappedDataLengthBuffer; private ByteBuffer unwrappedDataLengthBuffer;
private int serviceClass; private int serviceClass;
private boolean shouldClose = false; private boolean shouldClose = false;
private int ingressPort;
UserGroupInformation user = null; UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth public UserGroupInformation attemptingUser = null; // user name before auth
@ -1776,7 +1803,8 @@ public abstract class Server {
private boolean sentNegotiate = false; private boolean sentNegotiate = false;
private boolean useWrap = false; private boolean useWrap = false;
public Connection(SocketChannel channel, long lastContact) { public Connection(SocketChannel channel, long lastContact,
int ingressPort) {
this.channel = channel; this.channel = channel;
this.lastContact = lastContact; this.lastContact = lastContact;
this.data = null; this.data = null;
@ -1788,6 +1816,7 @@ public abstract class Server {
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4); this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket(); this.socket = channel.socket();
this.addr = socket.getInetAddress(); this.addr = socket.getInetAddress();
this.ingressPort = ingressPort;
if (addr == null) { if (addr == null) {
this.hostAddress = "*Unknown*"; this.hostAddress = "*Unknown*";
} else { } else {
@ -1822,9 +1851,24 @@ public abstract class Server {
return hostAddress; return hostAddress;
} }
public int getIngressPort() {
return ingressPort;
}
public InetAddress getHostInetAddress() { public InetAddress getHostInetAddress() {
return addr; return addr;
} }
public String getEstablishedQOP() {
// In practice, saslServer should not be null when this is
// called. If it is null, it must be either some
// configuration mistake or it is called from unit test.
if (saslServer == null) {
LOG.warn("SASL server should not be null!");
return null;
}
return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
}
public void setLastContact(long lastContact) { public void setLastContact(long lastContact) {
this.lastContact = lastContact; this.lastContact = lastContact;
@ -2303,7 +2347,7 @@ public abstract class Server {
private SaslServer createSaslServer(AuthMethod authMethod) private SaslServer createSaslServer(AuthMethod authMethod)
throws IOException, InterruptedException { throws IOException, InterruptedException {
final Map<String,?> saslProps = final Map<String,?> saslProps =
saslPropsResolver.getServerProperties(addr); saslPropsResolver.getServerProperties(addr, ingressPort);
return new SaslRpcServer(authMethod).create(this, saslProps, secretManager); return new SaslRpcServer(authMethod).create(this, saslProps, secretManager);
} }
@ -2822,7 +2866,8 @@ public abstract class Server {
private class Handler extends Thread { private class Handler extends Thread {
public Handler(int instanceNumber) { public Handler(int instanceNumber) {
this.setDaemon(true); this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber + " on " + port); this.setName("IPC Server handler "+ instanceNumber +
" on default port " + port);
} }
@Override @Override
@ -2984,6 +3029,7 @@ public abstract class Server {
this.handlerCount = handlerCount; this.handlerCount = handlerCount;
this.socketSendBufferSize = 0; this.socketSendBufferSize = 0;
this.serverName = serverName; this.serverName = serverName;
this.auxiliaryListenerMap = null;
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (queueSizePerHandler != -1) { if (queueSizePerHandler != -1) {
@ -3023,8 +3069,9 @@ public abstract class Server {
this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods); this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
// Start the listener here and let it bind to the port // Start the listener here and let it bind to the port
listener = new Listener(); listener = new Listener(port);
this.port = listener.getAddress().getPort(); // set the server port to the default listener port.
this.port = listener.getAddress().getPort();
connectionManager = new ConnectionManager(); connectionManager = new ConnectionManager();
this.rpcMetrics = RpcMetrics.create(this, conf); this.rpcMetrics = RpcMetrics.create(this, conf);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
@ -3046,7 +3093,23 @@ public abstract class Server {
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class); this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
} }
public synchronized void addAuxiliaryListener(int auxiliaryPort)
throws IOException {
if (auxiliaryListenerMap == null) {
auxiliaryListenerMap = new HashMap<>();
}
if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) {
throw new IOException(
"There is already a listener binding to: " + auxiliaryPort);
}
Listener newListener = new Listener(auxiliaryPort);
// in the case of port = 0, the listener would be on a != 0 port.
LOG.info("Adding a server listener on port " +
newListener.getAddress().getPort());
auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener);
}
private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods) private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
throws IOException { throws IOException {
RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder(); RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
@ -3283,6 +3346,12 @@ public abstract class Server {
public synchronized void start() { public synchronized void start() {
responder.start(); responder.start();
listener.start(); listener.start();
if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
for (Listener newListener : auxiliaryListenerMap.values()) {
newListener.start();
}
}
handlers = new Handler[handlerCount]; handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) { for (int i = 0; i < handlerCount; i++) {
@ -3304,6 +3373,12 @@ public abstract class Server {
} }
listener.interrupt(); listener.interrupt();
listener.doStop(); listener.doStop();
if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
for (Listener newListener : auxiliaryListenerMap.values()) {
newListener.interrupt();
newListener.doStop();
}
}
responder.interrupt(); responder.interrupt();
notifyAll(); notifyAll();
this.rpcMetrics.shutdown(); this.rpcMetrics.shutdown();
@ -3327,6 +3402,23 @@ public abstract class Server {
public synchronized InetSocketAddress getListenerAddress() { public synchronized InetSocketAddress getListenerAddress() {
return listener.getAddress(); return listener.getAddress();
} }
/**
* Return the set of all the configured auxiliary socket addresses NameNode
* RPC is listening on. If there are none, or it is not configured at all, an
* empty set is returned.
* @return the set of all the auxiliary addresses on which the
* RPC server is listening on.
*/
public synchronized Set<InetSocketAddress> getAuxiliaryListenerAddresses() {
Set<InetSocketAddress> allAddrs = new HashSet<>();
if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
for (Listener auxListener : auxiliaryListenerMap.values()) {
allAddrs.add(auxListener.getAddress());
}
}
return allAddrs;
}
/** /**
* Called for each call. * Called for each call.
@ -3631,11 +3723,11 @@ public abstract class Server {
return connections.toArray(new Connection[0]); return connections.toArray(new Connection[0]);
} }
Connection register(SocketChannel channel) { Connection register(SocketChannel channel, int ingressPort) {
if (isFull()) { if (isFull()) {
return null; return null;
} }
Connection connection = new Connection(channel, Time.now()); Connection connection = new Connection(channel, Time.now(), ingressPort);
add(connection); add(connection);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection + LOG.debug("Server connection from " + connection +

View File

@ -102,7 +102,7 @@ public class SaslPropertiesResolver implements Configurable{
*/ */
public Map<String, String> getServerProperties(InetAddress clientAddress, public Map<String, String> getServerProperties(InetAddress clientAddress,
int ingressPort){ int ingressPort){
return properties; return getServerProperties(clientAddress);
} }
/** /**
@ -122,7 +122,7 @@ public class SaslPropertiesResolver implements Configurable{
*/ */
public Map<String, String> getClientProperties(InetAddress serverAddress, public Map<String, String> getClientProperties(InetAddress serverAddress,
int ingressPort) { int ingressPort) {
return properties; return getClientProperties(serverAddress);
} }
/** /**

View File

@ -49,8 +49,10 @@ import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
@ -171,6 +173,11 @@ public class TestIPC {
this(handlerCount, sleep, LongWritable.class, null); this(handlerCount, sleep, LongWritable.class, null);
} }
public TestServer(int port, int handlerCount, boolean sleep)
throws IOException {
this(port, handlerCount, sleep, LongWritable.class, null);
}
public TestServer(int handlerCount, boolean sleep, Configuration conf) public TestServer(int handlerCount, boolean sleep, Configuration conf)
throws IOException { throws IOException {
this(handlerCount, sleep, LongWritable.class, null, conf); this(handlerCount, sleep, LongWritable.class, null, conf);
@ -182,11 +189,24 @@ public class TestIPC {
this(handlerCount, sleep, paramClass, responseClass, conf); this(handlerCount, sleep, paramClass, responseClass, conf);
} }
public TestServer(int port, int handlerCount, boolean sleep,
Class<? extends Writable> paramClass,
Class<? extends Writable> responseClass) throws IOException {
this(port, handlerCount, sleep, paramClass, responseClass, conf);
}
public TestServer(int handlerCount, boolean sleep, public TestServer(int handlerCount, boolean sleep,
Class<? extends Writable> paramClass, Class<? extends Writable> paramClass,
Class<? extends Writable> responseClass, Configuration conf) Class<? extends Writable> responseClass, Configuration conf)
throws IOException { throws IOException {
super(ADDRESS, 0, paramClass, handlerCount, conf); this(0, handlerCount, sleep, paramClass, responseClass, conf);
}
public TestServer(int port, int handlerCount, boolean sleep,
Class<? extends Writable> paramClass,
Class<? extends Writable> responseClass, Configuration conf)
throws IOException {
super(ADDRESS, port, paramClass, handlerCount, conf);
this.sleep = sleep; this.sleep = sleep;
this.responseClass = responseClass; this.responseClass = responseClass;
} }
@ -338,6 +358,37 @@ public class TestIPC {
} }
server.stop(); server.stop();
} }
@Test
public void testAuxiliaryPorts() throws IOException, InterruptedException {
int defaultPort = 9000;
int[] auxiliaryPorts = {9001, 9002, 9003};
final int handlerCount = 5;
final boolean handlerSleep = false;
Server server = new TestServer(defaultPort, handlerCount, handlerSleep);
for (int port : auxiliaryPorts) {
server.addAuxiliaryListener(port);
}
Set<InetSocketAddress> listenerAddrs =
server.getAuxiliaryListenerAddresses();
Set<InetSocketAddress> addrs = new HashSet<>();
for (InetSocketAddress addr : listenerAddrs) {
addrs.add(NetUtils.getConnectAddress(addr));
}
server.start();
Client client = new Client(LongWritable.class, conf);
Set<SerialCaller> calls = new HashSet<>();
for (InetSocketAddress addr : addrs) {
calls.add(new SerialCaller(client, addr, 100));
}
for (SerialCaller caller : calls) {
caller.join();
assertFalse(caller.failed);
}
client.stop();
server.stop();
}
@Test(timeout=60000) @Test(timeout=60000)
public void testStandAloneClient() throws IOException { public void testStandAloneClient() throws IOException {

View File

@ -21,6 +21,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.SignedBytes; import com.google.common.primitives.SignedBytes;
import java.net.URISyntaxException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
@ -92,6 +93,8 @@ import java.util.Arrays;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
@InterfaceAudience.Private @InterfaceAudience.Private
@ -432,7 +435,7 @@ public class DFSUtilClient {
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap(); Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) { for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId); String suffix = concatSuffixes(nsId, nnId);
String address = getConfValue(defaultValue, suffix, conf, keys); String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
if (address != null) { if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address); InetSocketAddress isa = NetUtils.createSocketAddr(address);
if (isa.isUnresolved()) { if (isa.isUnresolved()) {
@ -446,6 +449,86 @@ public class DFSUtilClient {
return ret; return ret;
} }
/**
* Return address from configuration. Take a list of keys as preference.
* If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY,
* will check to see if auxiliary ports are enabled. If so, call to replace
* address port with auxiliary port. If the address is not the value of
* DFS_NAMENODE_RPC_ADDRESS_KEY, return the original address. If failed to
* find any address, return the given default value.
*
* @param defaultValue the default value if no values found for given keys
* @param suffix suffix to append to keys
* @param conf the configuration
* @param keys a list of keys, ordered by preference
* @return
*/
private static String checkKeysAndProcess(String defaultValue, String suffix,
Configuration conf, String... keys) {
String succeededKey = null;
String address = null;
for (String key : keys) {
address = getConfValue(null, suffix, conf, key);
if (address != null) {
succeededKey = key;
break;
}
}
String ret;
if (address == null) {
ret = defaultValue;
} else if(DFS_NAMENODE_RPC_ADDRESS_KEY.equals(succeededKey)) {
ret = checkRpcAuxiliary(conf, suffix, address);
} else {
ret = address;
}
return ret;
}
/**
* Check if auxiliary port is enabled, if yes, check if the given address
* should have its port replaced by an auxiliary port. If the given address
* does not contain a port, append the auxiliary port to enforce using it.
*
* @param conf configuration.
* @param address the address to check and modify (if needed).
* @return the new modified address containing auxiliary port, or original
* address if auxiliary port not enabled.
*/
private static String checkRpcAuxiliary(Configuration conf, String suffix,
String address) {
String key = DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
key = addSuffix(key, suffix);
int[] ports = conf.getInts(key);
if (ports == null || ports.length == 0) {
return address;
}
LOG.info("Using server auxiliary ports " + Arrays.toString(ports));
URI uri;
try {
uri = new URI(address);
} catch (URISyntaxException e) {
// return the original address untouched if it is not a valid URI. This
// happens in unit test, as MiniDFSCluster sets the value to
// 127.0.0.1:0, without schema (i.e. "hdfs://"). While in practice, this
// should not be the case. So log a warning message here.
LOG.warn("NameNode address is not a valid uri:" + address);
return address;
}
// Ignore the port, only take the schema(e.g. hdfs) and host (e.g.
// localhost), then append port
// TODO : revisit if there is a better way
StringBuilder sb = new StringBuilder();
sb.append(uri.getScheme());
sb.append("://");
sb.append(uri.getHost());
sb.append(":");
// TODO : currently, only the very first auxiliary port is being used.
// But actually NN supports running multiple auxiliary
sb.append(ports[0]);
return sb.toString();
}
/** /**
* Given a list of keys in the order of preference, returns a value * Given a list of keys in the order of preference, returns a value
* for the key in the given order from the configuration. * for the key in the given order from the configuration.

View File

@ -68,6 +68,11 @@ public interface HdfsClientConfigKeys {
String PREFIX = "dfs.client."; String PREFIX = "dfs.client.";
String DFS_NAMESERVICES = "dfs.nameservices"; String DFS_NAMESERVICES = "dfs.nameservices";
String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports";
String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY
+ "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX;
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870; int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;
String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871; int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871;

View File

@ -1231,6 +1231,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final Class<DFSNetworkTopology> DFS_NET_TOPOLOGY_IMPL_DEFAULT = public static final Class<DFSNetworkTopology> DFS_NET_TOPOLOGY_IMPL_DEFAULT =
DFSNetworkTopology.class; DFSNetworkTopology.class;
public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated @Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -1058,6 +1059,14 @@ public class NameNode extends ReconfigurableBase implements
return rpcServer.getRpcAddress(); return rpcServer.getRpcAddress();
} }
/**
* @return The auxiliary nameNode RPC addresses, or empty set if there
* is none.
*/
public Set<InetSocketAddress> getAuxiliaryNameNodeAddresses() {
return rpcServer.getAuxiliaryRpcAddresses();
}
/** /**
* @return NameNode RPC address in "host:port" string form * @return NameNode RPC address in "host:port" string form
*/ */
@ -1065,6 +1074,27 @@ public class NameNode extends ReconfigurableBase implements
return NetUtils.getHostPortString(getNameNodeAddress()); return NetUtils.getHostPortString(getNameNodeAddress());
} }
/**
* Return a host:port format string corresponds to an auxiliary
* port configured on NameNode. If there are multiple auxiliary ports,
* an arbitrary one is returned. If there is no auxiliary listener, returns
* null.
*
* @return a string of format host:port that points to an auxiliary NameNode
* address, or null if there is no such address.
*/
@VisibleForTesting
public String getNNAuxiliaryRpcAddress() {
Set<InetSocketAddress> auxiliaryAddrs = getAuxiliaryNameNodeAddresses();
if (auxiliaryAddrs.isEmpty()) {
return null;
}
// since set has no particular order, returning the first element of
// from the iterator is effectively arbitrary.
InetSocketAddress addr = auxiliaryAddrs.iterator().next();
return NetUtils.getHostPortString(addr);
}
/** /**
* @return NameNode service RPC address if configured, the * @return NameNode service RPC address if configured, the
* NameNode RPC address otherwise * NameNode RPC address otherwise

View File

@ -26,6 +26,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
@ -538,6 +539,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (lifelineRpcServer != null) { if (lifelineRpcServer != null) {
lifelineRpcServer.setTracer(nn.tracer); lifelineRpcServer.setTracer(nn.tracer);
} }
int[] auxiliaryPorts =
conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
for (int auxiliaryPort : auxiliaryPorts) {
this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
}
}
} }
/** Allow access to the lifeline RPC server for testing */ /** Allow access to the lifeline RPC server for testing */
@ -607,10 +615,16 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return serviceRPCAddress; return serviceRPCAddress;
} }
InetSocketAddress getRpcAddress() { @VisibleForTesting
public InetSocketAddress getRpcAddress() {
return clientRpcAddress; return clientRpcAddress;
} }
@VisibleForTesting
public Set<InetSocketAddress> getAuxiliaryRpcAddresses() {
return clientRpcServer.getAuxiliaryListenerAddresses();
}
private static UserGroupInformation getRemoteUser() throws IOException { private static UserGroupInformation getRemoteUser() throws IOException {
return NameNode.getRemoteUser(); return NameNode.getRemoteUser();
} }

View File

@ -5044,6 +5044,17 @@
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.rpc-address.auxiliary-ports</name>
<value></value>
<description>
A comma separated list of auxiliary ports for the NameNode to listen on.
This allows exposing multiple NN addresses to clients.
Particularly, it is used to enforce different SASL levels on different ports.
Empty list indicates that auxiliary ports are disabled.
</description>
</property>
<property> <property>
<name>dfs.namenode.blockreport.queue.size</name> <name>dfs.namenode.blockreport.queue.size</name>
<value>1024</value> <value>1024</value>

View File

@ -1361,6 +1361,21 @@ public class MiniDFSCluster implements AutoCloseable {
} }
return uri; return uri;
} }
URI getURIForAuxiliaryPort(int nnIndex) {
String hostPort =
getNN(nnIndex).nameNode.getNNAuxiliaryRpcAddress();
if (hostPort == null) {
throw new RuntimeException("No auxiliary port found");
}
URI uri = null;
try {
uri = new URI("hdfs://" + hostPort);
} catch (URISyntaxException e) {
NameNode.LOG.warn("unexpected URISyntaxException", e);
}
return uri;
}
public int getInstanceId() { public int getInstanceId() {
return instanceId; return instanceId;
@ -1961,6 +1976,14 @@ public class MiniDFSCluster implements AutoCloseable {
checkSingleNameNode(); checkSingleNameNode();
return getNameNodePort(0); return getNameNodePort(0);
} }
/**
* Get the auxiliary port of NameNode, NameNode specified by index.
*/
public int getNameNodeAuxiliaryPort() {
checkSingleNameNode();
return getNameNodeAuxiliaryPort(0);
}
/** /**
* Gets the rpc port used by the NameNode at the given index, because the * Gets the rpc port used by the NameNode at the given index, because the
@ -1970,6 +1993,22 @@ public class MiniDFSCluster implements AutoCloseable {
return getNN(nnIndex).nameNode.getNameNodeAddress().getPort(); return getNN(nnIndex).nameNode.getNameNodeAddress().getPort();
} }
/**
* Gets the rpc port used by the NameNode at the given index, if the
* NameNode has multiple auxiliary ports configured, a arbitrary
* one is returned.
*/
public int getNameNodeAuxiliaryPort(int nnIndex) {
Set<InetSocketAddress> allAuxiliaryAddresses =
getNN(nnIndex).nameNode.getAuxiliaryNameNodeAddresses();
if (allAuxiliaryAddresses.isEmpty()) {
return -1;
} else {
InetSocketAddress addr = allAuxiliaryAddresses.iterator().next();
return addr.getPort();
}
}
/** /**
* @return the service rpc port used by the NameNode at the given index. * @return the service rpc port used by the NameNode at the given index.
*/ */
@ -2531,6 +2570,12 @@ public class MiniDFSCluster implements AutoCloseable {
return getFileSystem(0); return getFileSystem(0);
} }
public DistributedFileSystem getFileSystemFromAuxiliaryPort()
throws IOException {
checkSingleNameNode();
return getFileSystemFromAuxiliaryPort(0);
}
/** /**
* Get a client handle to the DFS cluster for the namenode at given index. * Get a client handle to the DFS cluster for the namenode at given index.
*/ */
@ -2539,6 +2584,12 @@ public class MiniDFSCluster implements AutoCloseable {
getNN(nnIndex).conf)); getNN(nnIndex).conf));
} }
public DistributedFileSystem getFileSystemFromAuxiliaryPort(int nnIndex)
throws IOException {
return (DistributedFileSystem) addFileSystem(FileSystem.get(
getURIForAuxiliaryPort(nnIndex), getNN(nnIndex).conf));
}
/** /**
* Get another FileSystem instance that is different from FileSystem.get(conf). * Get another FileSystem instance that is different from FileSystem.get(conf).
* This simulating different threads working on different FileSystem instances. * This simulating different threads working on different FileSystem instances.

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.junit.Test;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test NN auxiliary port with HA.
*/
public class TestHAAuxiliaryPort {
@Test
public void testTest() throws Exception {
Configuration conf = new Configuration();
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
"9000,9001");
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn2",
"9000,9001");
conf.set(DFS_NAMESERVICES, "ha-nn-uri-0");
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ha-nn-uri-0", "nn1,nn2");
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0")
.addNN(new MiniDFSNNTopology.NNConf("nn1"))
.addNN(new MiniDFSNNTopology.NNConf("nn2")));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
cluster.transitionToActive(0);
cluster.waitActive();
NameNode nn0 = cluster.getNameNode(0);
NameNode nn1 = cluster.getNameNode(1);
// all the addresses below are valid nn0 addresses
NameNodeRpcServer rpcServer0 = (NameNodeRpcServer)nn0.getRpcServer();
InetSocketAddress server0RpcAddress = rpcServer0.getRpcAddress();
Set<InetSocketAddress> auxAddrServer0 =
rpcServer0.getAuxiliaryRpcAddresses();
assertEquals(2, auxAddrServer0.size());
// all the addresses below are valid nn1 addresses
NameNodeRpcServer rpcServer1 = (NameNodeRpcServer)nn1.getRpcServer();
InetSocketAddress server1RpcAddress = rpcServer1.getRpcAddress();
Set<InetSocketAddress> auxAddrServer1 =
rpcServer1.getAuxiliaryRpcAddresses();
assertEquals(2, auxAddrServer1.size());
// mkdir on nn0 uri 0
URI nn0URI = new URI("hdfs://localhost:" +
server0RpcAddress.getPort());
try (DFSClient client0 = new DFSClient(nn0URI, conf)){
client0.mkdirs("/test", null, true);
// should be available on other ports also
for (InetSocketAddress auxAddr : auxAddrServer0) {
nn0URI = new URI("hdfs://localhost:" + auxAddr.getPort());
try (DFSClient clientTmp = new DFSClient(nn0URI, conf)) {
assertTrue(clientTmp.exists("/test"));
}
}
}
// now perform a failover
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
// then try to read the file from the nn1
URI nn1URI = new URI("hdfs://localhost:" +
server1RpcAddress.getPort());
try (DFSClient client1 = new DFSClient(nn1URI, conf)) {
assertTrue(client1.exists("/test"));
// should be available on other ports also
for (InetSocketAddress auxAddr : auxAddrServer1) {
nn1URI = new URI("hdfs://localhost:" + auxAddr.getPort());
try (DFSClient clientTmp = new DFSClient(nn1URI, conf)) {
assertTrue(client1.exists("/test"));
}
}
}
}
}