HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2020-02-04 12:12:35 -08:00
parent 10a60fbe20
commit ce7b8b5634
5 changed files with 174 additions and 35 deletions

View File

@ -382,20 +382,28 @@ public static InetAddress getRemoteIp() {
/**
* Returns the SASL qop for the current call, if the current call is
* set, and the SASL negotiation is done. Otherwise return null. Note
* that CurCall is thread local object. So in fact, different handler
* threads will process different CurCall object.
* set, and the SASL negotiation is done. Otherwise return null
* Note this only returns established QOP for auxiliary port, and
* returns null for primary (non-auxiliary) port.
*
* Also note that CurCall is thread local object. So in fact, different
* handler threads will process different CurCall object.
*
* Also, only return for RPC calls, not supported for other protocols.
* @return the QOP of the current connection.
*/
public static String getEstablishedQOP() {
public static String getAuxiliaryPortEstablishedQOP() {
Call call = CurCall.get();
if (call == null || !(call instanceof RpcCall)) {
if (!(call instanceof RpcCall)) {
return null;
}
RpcCall rpcCall = (RpcCall)call;
return rpcCall.connection.getEstablishedQOP();
if (rpcCall.connection.isOnAuxiliaryPort()) {
return rpcCall.connection.getEstablishedQOP();
} else {
// Not sending back QOP for primary port
return null;
}
}
/**
@ -1185,7 +1193,8 @@ private class Listener extends Thread {
private boolean reuseAddr = conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_REUSEADDR_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_REUSEADDR_DEFAULT);
private boolean isOnAuxiliaryPort;
Listener(int port) throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
@ -1213,6 +1222,11 @@ private class Listener extends Thread {
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
this.isOnAuxiliaryPort = false;
}
void setIsAuxiliary() {
this.isOnAuxiliaryPort = true;
}
private class Reader extends Thread {
@ -1381,7 +1395,8 @@ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOf
channel.socket().setKeepAlive(true);
Reader reader = getReader();
Connection c = connectionManager.register(channel, this.listenPort);
Connection c = connectionManager.register(channel,
this.listenPort, this.isOnAuxiliaryPort);
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
@ -1805,6 +1820,7 @@ public class Connection {
private int serviceClass;
private boolean shouldClose = false;
private int ingressPort;
private boolean isOnAuxiliaryPort;
UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth
@ -1817,7 +1833,7 @@ public class Connection {
private boolean useWrap = false;
public Connection(SocketChannel channel, long lastContact,
int ingressPort) {
int ingressPort, boolean isOnAuxiliaryPort) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
@ -1830,6 +1846,7 @@ public Connection(SocketChannel channel, long lastContact,
this.socket = channel.socket();
this.addr = socket.getInetAddress();
this.ingressPort = ingressPort;
this.isOnAuxiliaryPort = isOnAuxiliaryPort;
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
@ -1875,7 +1892,11 @@ public InetAddress getHostInetAddress() {
public String getEstablishedQOP() {
return establishedQOP;
}
public boolean isOnAuxiliaryPort() {
return isOnAuxiliaryPort;
}
public void setLastContact(long lastContact) {
this.lastContact = lastContact;
}
@ -3113,6 +3134,8 @@ public synchronized void addAuxiliaryListener(int auxiliaryPort)
"There is already a listener binding to: " + auxiliaryPort);
}
Listener newListener = new Listener(auxiliaryPort);
newListener.setIsAuxiliary();
// in the case of port = 0, the listener would be on a != 0 port.
LOG.info("Adding a server listener on port " +
newListener.getAddress().getPort());
@ -3732,11 +3755,13 @@ Connection[] toArray() {
return connections.toArray(new Connection[0]);
}
Connection register(SocketChannel channel, int ingressPort) {
Connection register(SocketChannel channel, int ingressPort,
boolean isOnAuxiliaryPort) {
if (isFull()) {
return null;
}
Connection connection = new Connection(channel, Time.now(), ingressPort);
Connection connection = new Connection(channel, Time.now(),
ingressPort, isOnAuxiliaryPort);
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +

View File

@ -290,7 +290,7 @@ public Token<BlockTokenIdentifier> generateToken(String userId,
.getBlockPoolId(), block.getBlockId(), modes, storageTypes,
storageIds, useProto);
if (shouldWrapQOP) {
String qop = Server.getEstablishedQOP();
String qop = Server.getAuxiliaryPortEstablishedQOP();
if (qop != null) {
id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8));
}

View File

@ -1900,17 +1900,6 @@ private static String getClientMachine() {
return clientMachine;
}
/**
* Return the QOP of the client that the current handler thread
* is handling. Assuming the negotiation is done at this point,
* otherwise returns null.
*
* @return the established QOP of this client.
*/
public static String getEstablishedClientQOP() {
return Server.getEstablishedQOP();
}
@Override
public DataEncryptionKey getDataEncryptionKey() throws IOException {
checkNNStartup();

View File

@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hdfs;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@ -77,12 +80,33 @@ public TestBlockTokenWrappingQOP(String configKey, String qopValue) {
@Before
public void setup() throws Exception {
conf = createSecureConfig(this.configKey);
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "12000");
// explicitly setting service rpc for datanode. This because
// DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
// and service port at the same time, and if no setting for service
// rpc, it would return client port, in this case, it will be the
// auxiliary port for data node. Which is not what auxiliary is for.
// setting service rpc port to avoid this.
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
conf.set(
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
"org.apache.hadoop.security.IngressPortBasedResolver");
conf.set("ingress.port.sasl.configured.ports", "12000");
conf.set("ingress.port.sasl.prop.12000", this.configKey);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
conf.set(HADOOP_RPC_PROTECTION, this.configKey);
cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
HdfsConfiguration clientConf = new HdfsConfiguration(conf);
clientConf.unset(
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
URI currentURI = cluster.getURI();
URI uriAuxiliary = new URI(currentURI.getScheme() +
"://" + currentURI.getHost() + ":12000");
dfs = (DistributedFileSystem) FileSystem.get(uriAuxiliary, conf);
}
@After
@ -97,7 +121,6 @@ public void testAddBlockWrappingQOP() throws Exception {
final String src = "/testAddBlockWrappingQOP";
final Path path = new Path(src);
dfs = cluster.getFileSystem();
dfs.create(path);
DFSClient client = dfs.getClient();
@ -114,7 +137,6 @@ public void testAppendWrappingQOP() throws Exception {
final String src = "/testAppendWrappingQOP";
final Path path = new Path(src);
dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path);
// NameNode append call returns a last block instance. If there is nothing
// it returns as a null. So write something, so that lastBlock has
@ -138,7 +160,6 @@ public void testGetBlockLocationWrappingQOP() throws Exception {
final String src = "/testGetBlockLocationWrappingQOP";
final Path path = new Path(src);
dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path);
// if the file is empty, there will be no blocks returned. Write something
// so that getBlockLocations actually returns some block.

View File

@ -21,13 +21,17 @@
import java.util.ArrayList;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.security.token.Token;
import org.junit.Before;
import org.junit.Test;
@ -77,6 +81,85 @@ public void setup() throws Exception {
clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
}
/**
* Test that when NameNode returns back its established QOP,
* it only does this for auxiliary port(s), not the primary port.
*
* @throws Exception
*/
@Test
public void testAuxiliaryPortSendingQOP() throws Exception {
MiniDFSCluster cluster = null;
final String pathPrefix = "/filetestAuxiliaryPortSendingQOP";
try {
cluster = new MiniDFSCluster.Builder(clusterConf)
.numDataNodes(3).build();
cluster.waitActive();
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.unset(
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
URI currentURI = cluster.getURI();
URI uriAuthPort = new URI(currentURI.getScheme() + "://" +
currentURI.getHost() + ":12000");
URI uriIntegrityPort = new URI(currentURI.getScheme() + "://" +
currentURI.getHost() + ":12100");
URI uriPrivacyPort = new URI(currentURI.getScheme() +
"://" + currentURI.getHost() + ":12200");
// If connecting to primary port, block token should not include
// handshake secret
byte[] secretOnPrimary = getHandshakeSecret(currentURI, clientConf,
new Path(pathPrefix + "Primary"));
assertTrue(secretOnPrimary == null || secretOnPrimary.length == 0);
// If connecting to auxiliary port, block token should include
// handshake secret
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
byte[] secretPrivacy = getHandshakeSecret(uriPrivacyPort, clientConf,
new Path(pathPrefix + "Privacy"));
assertTrue(secretPrivacy.length > 0);
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
byte[] secretIntegrity = getHandshakeSecret(uriIntegrityPort, clientConf,
new Path(pathPrefix + "Integrity"));
assertTrue(secretIntegrity.length > 0);
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
byte[] secretAuthentication = getHandshakeSecret(uriAuthPort,
clientConf, new Path(pathPrefix + "Authentication"));
assertTrue(secretAuthentication.length > 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private byte[] getHandshakeSecret(URI uri, HdfsConfiguration conf,
Path path) throws Exception {
FileSystem fs = FileSystem.get(uri, conf);
FSDataOutputStream out = fs.create(
path, false, 4096, (short)1, BLOCK_SIZE);
try {
out.write(0);
out.hflush();
Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
final byte[] tokenBytes = token.getIdentifier();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenBytes, tokenBytes.length);
BlockTokenIdentifier blockToken = new BlockTokenIdentifier();
blockToken.readFields(dib);
return blockToken.getHandshakeMsg();
} finally {
out.close();
}
}
/**
* Test accessing NameNode from three different ports.
*
@ -168,33 +251,54 @@ public void testMultipleNNPortOverwriteDownStream() throws Exception {
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
doTest(fsPrivacy, PATH1);
// add a wait so that data has reached not only first DN,
// but also the rest
Thread.sleep(100);
for (int i = 0; i < 2; i++) {
DataNode dn = dataNodes.get(i);
SaslDataTransferClient saslClient = dn.getSaslClient();
assertEquals("auth", saslClient.getTargetQOP());
String qop = null;
// It may take some time for the qop to populate
// to all DNs, check in a loop.
for (int trial = 0; trial < 10; trial++) {
qop = saslClient.getTargetQOP();
if (qop != null) {
break;
}
Thread.sleep(100);
}
assertEquals("auth", qop);
}
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
doTest(fsIntegrity, PATH2);
Thread.sleep(100);
for (int i = 0; i < 2; i++) {
DataNode dn = dataNodes.get(i);
SaslDataTransferClient saslClient = dn.getSaslClient();
assertEquals("auth", saslClient.getTargetQOP());
String qop = null;
for (int trial = 0; trial < 10; trial++) {
qop = saslClient.getTargetQOP();
if (qop != null) {
break;
}
Thread.sleep(100);
}
assertEquals("auth", qop);
}
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
doTest(fsAuth, PATH3);
Thread.sleep(100);
for (int i = 0; i < 3; i++) {
DataNode dn = dataNodes.get(i);
SaslDataTransferServer saslServer = dn.getSaslServer();
assertEquals("auth", saslServer.getNegotiatedQOP());
String qop = null;
for (int trial = 0; trial < 10; trial++) {
qop = saslServer.getNegotiatedQOP();
if (qop != null) {
break;
}
Thread.sleep(100);
}
assertEquals("auth", qop);
}
} finally {
if (cluster != null) {