HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2020-02-04 12:59:31 -08:00
parent fb8ee3228e
commit b97fdb7b4c
5 changed files with 175 additions and 37 deletions

View File

@ -375,20 +375,28 @@ public static InetAddress getRemoteIp() {
/** /**
* Returns the SASL qop for the current call, if the current call is * Returns the SASL qop for the current call, if the current call is
* set, and the SASL negotiation is done. Otherwise return null. Note * set, and the SASL negotiation is done. Otherwise return null
* that CurCall is thread local object. So in fact, different handler * Note this only returns established QOP for auxiliary port, and
* threads will process different CurCall object. * returns null for primary (non-auxiliary) port.
*
* Also note that CurCall is thread local object. So in fact, different
* handler threads will process different CurCall object.
* *
* Also, only return for RPC calls, not supported for other protocols. * Also, only return for RPC calls, not supported for other protocols.
* @return the QOP of the current connection. * @return the QOP of the current connection.
*/ */
public static String getEstablishedQOP() { public static String getAuxiliaryPortEstablishedQOP() {
Call call = CurCall.get(); Call call = CurCall.get();
if (call == null || !(call instanceof RpcCall)) { if (!(call instanceof RpcCall)) {
return null; return null;
} }
RpcCall rpcCall = (RpcCall)call; RpcCall rpcCall = (RpcCall)call;
return rpcCall.connection.getEstablishedQOP(); if (rpcCall.connection.isOnAuxiliaryPort()) {
return rpcCall.connection.getEstablishedQOP();
} else {
// Not sending back QOP for primary port
return null;
}
} }
/** /**
@ -1169,7 +1177,8 @@ private class Listener extends Thread {
private int backlogLength = conf.getInt( private int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
private boolean isOnAuxiliaryPort;
Listener(int port) throws IOException { Listener(int port) throws IOException {
address = new InetSocketAddress(bindAddress, port); address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode // Create a new server socket and set to non blocking mode
@ -1196,6 +1205,11 @@ private class Listener extends Thread {
acceptChannel.register(selector, SelectionKey.OP_ACCEPT); acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port); this.setName("IPC Server listener on " + port);
this.setDaemon(true); this.setDaemon(true);
this.isOnAuxiliaryPort = false;
}
void setIsAuxiliary() {
this.isOnAuxiliaryPort = true;
} }
private class Reader extends Thread { private class Reader extends Thread {
@ -1364,7 +1378,8 @@ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOf
channel.socket().setKeepAlive(true); channel.socket().setKeepAlive(true);
Reader reader = getReader(); Reader reader = getReader();
Connection c = connectionManager.register(channel, this.listenPort); Connection c = connectionManager.register(channel,
this.listenPort, this.isOnAuxiliaryPort);
// If the connectionManager can't take it, close the connection. // If the connectionManager can't take it, close the connection.
if (c == null) { if (c == null) {
if (channel.isOpen()) { if (channel.isOpen()) {
@ -1788,6 +1803,7 @@ public class Connection {
private int serviceClass; private int serviceClass;
private boolean shouldClose = false; private boolean shouldClose = false;
private int ingressPort; private int ingressPort;
private boolean isOnAuxiliaryPort;
UserGroupInformation user = null; UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth public UserGroupInformation attemptingUser = null; // user name before auth
@ -1800,7 +1816,7 @@ public class Connection {
private boolean useWrap = false; private boolean useWrap = false;
public Connection(SocketChannel channel, long lastContact, public Connection(SocketChannel channel, long lastContact,
int ingressPort) { int ingressPort, boolean isOnAuxiliaryPort) {
this.channel = channel; this.channel = channel;
this.lastContact = lastContact; this.lastContact = lastContact;
this.data = null; this.data = null;
@ -1810,6 +1826,7 @@ public Connection(SocketChannel channel, long lastContact,
this.socket = channel.socket(); this.socket = channel.socket();
this.addr = socket.getInetAddress(); this.addr = socket.getInetAddress();
this.ingressPort = ingressPort; this.ingressPort = ingressPort;
this.isOnAuxiliaryPort = isOnAuxiliaryPort;
if (addr == null) { if (addr == null) {
this.hostAddress = "*Unknown*"; this.hostAddress = "*Unknown*";
} else { } else {
@ -1855,7 +1872,11 @@ public InetAddress getHostInetAddress() {
public String getEstablishedQOP() { public String getEstablishedQOP() {
return establishedQOP; return establishedQOP;
} }
public boolean isOnAuxiliaryPort() {
return isOnAuxiliaryPort;
}
public void setLastContact(long lastContact) { public void setLastContact(long lastContact) {
this.lastContact = lastContact; this.lastContact = lastContact;
} }
@ -3016,6 +3037,8 @@ public synchronized void addAuxiliaryListener(int auxiliaryPort)
"There is already a listener binding to: " + auxiliaryPort); "There is already a listener binding to: " + auxiliaryPort);
} }
Listener newListener = new Listener(auxiliaryPort); Listener newListener = new Listener(auxiliaryPort);
newListener.setIsAuxiliary();
// in the case of port = 0, the listener would be on a != 0 port. // in the case of port = 0, the listener would be on a != 0 port.
LOG.info("Adding a server listener on port " + LOG.info("Adding a server listener on port " +
newListener.getAddress().getPort()); newListener.getAddress().getPort());
@ -3635,11 +3658,13 @@ Connection[] toArray() {
return connections.toArray(new Connection[0]); return connections.toArray(new Connection[0]);
} }
Connection register(SocketChannel channel, int ingressPort) { Connection register(SocketChannel channel, int ingressPort,
boolean isOnAuxiliaryPort) {
if (isFull()) { if (isFull()) {
return null; return null;
} }
Connection connection = new Connection(channel, Time.now(), ingressPort); Connection connection = new Connection(channel, Time.now(),
ingressPort, isOnAuxiliaryPort);
add(connection); add(connection);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection + LOG.debug("Server connection from " + connection +

View File

@ -272,7 +272,7 @@ public Token<BlockTokenIdentifier> generateToken(String userId,
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
.getBlockPoolId(), block.getBlockId(), modes); .getBlockPoolId(), block.getBlockId(), modes);
if (shouldWrapQOP) { if (shouldWrapQOP) {
String qop = Server.getEstablishedQOP(); String qop = Server.getAuxiliaryPortEstablishedQOP();
if (qop != null) { if (qop != null) {
id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8)); id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8));
} }

View File

@ -1776,17 +1776,6 @@ private static String getClientMachine() {
return clientMachine; return clientMachine;
} }
/**
* Return the QOP of the client that the current handler thread
* is handling. Assuming the negotiation is done at this point,
* otherwise returns null.
*
* @return the established QOP of this client.
*/
public static String getEstablishedClientQOP() {
return Server.getEstablishedQOP();
}
@Override @Override
public DataEncryptionKey getDataEncryptionKey() throws IOException { public DataEncryptionKey getDataEncryptionKey() throws IOException {
checkNNStartup(); checkNNStartup();

View File

@ -17,14 +17,17 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@ -40,8 +43,7 @@
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -80,12 +82,33 @@ public TestBlockTokenWrappingQOP(String configKey, String qopValue) {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
conf = createSecureConfig(this.configKey); conf = createSecureConfig(this.configKey);
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "12000");
// explicitly setting service rpc for datanode. This because
// DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
// and service port at the same time, and if no setting for service
// rpc, it would return client port, in this case, it will be the
// auxiliary port for data node. Which is not what auxiliary is for.
// setting service rpc port to avoid this.
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
conf.set(
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
"org.apache.hadoop.security.IngressPortBasedResolver");
conf.set("ingress.port.sasl.configured.ports", "12000");
conf.set("ingress.port.sasl.prop.12000", this.configKey);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true); conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
conf.set(HADOOP_RPC_PROTECTION, this.configKey); conf.set(HADOOP_RPC_PROTECTION, this.configKey);
cluster = null; cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive(); cluster.waitActive();
HdfsConfiguration clientConf = new HdfsConfiguration(conf);
clientConf.unset(
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
URI currentURI = cluster.getURI();
URI uriAuxiliary = new URI(currentURI.getScheme() +
"://" + currentURI.getHost() + ":12000");
dfs = (DistributedFileSystem) FileSystem.get(uriAuxiliary, conf);
} }
@After @After
@ -100,7 +123,6 @@ public void testAddBlockWrappingQOP() throws Exception {
final String src = "/testAddBlockWrappingQOP"; final String src = "/testAddBlockWrappingQOP";
final Path path = new Path(src); final Path path = new Path(src);
dfs = cluster.getFileSystem();
dfs.create(path); dfs.create(path);
DFSClient client = dfs.getClient(); DFSClient client = dfs.getClient();
@ -117,7 +139,6 @@ public void testAppendWrappingQOP() throws Exception {
final String src = "/testAppendWrappingQOP"; final String src = "/testAppendWrappingQOP";
final Path path = new Path(src); final Path path = new Path(src);
dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path); FSDataOutputStream out = dfs.create(path);
// NameNode append call returns a last block instance. If there is nothing // NameNode append call returns a last block instance. If there is nothing
// it returns as a null. So write something, so that lastBlock has // it returns as a null. So write something, so that lastBlock has
@ -141,7 +162,6 @@ public void testGetBlockLocationWrappingQOP() throws Exception {
final String src = "/testGetBlockLocationWrappingQOP"; final String src = "/testGetBlockLocationWrappingQOP";
final Path path = new Path(src); final Path path = new Path(src);
dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path); FSDataOutputStream out = dfs.create(path);
// if the file is empty, there will be no blocks returned. Write something // if the file is empty, there will be no blocks returned. Write something
// so that getBlockLocations actually returns some block. // so that getBlockLocations actually returns some block.

View File

@ -21,13 +21,17 @@
import java.util.ArrayList; import java.util.ArrayList;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.security.token.Token;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -77,6 +81,85 @@ public void setup() throws Exception {
clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true); clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
} }
/**
* Test that when NameNode returns back its established QOP,
* it only does this for auxiliary port(s), not the primary port.
*
* @throws Exception
*/
@Test
public void testAuxiliaryPortSendingQOP() throws Exception {
MiniDFSCluster cluster = null;
final String pathPrefix = "/filetestAuxiliaryPortSendingQOP";
try {
cluster = new MiniDFSCluster.Builder(clusterConf)
.numDataNodes(3).build();
cluster.waitActive();
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.unset(
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
URI currentURI = cluster.getURI();
URI uriAuthPort = new URI(currentURI.getScheme() + "://" +
currentURI.getHost() + ":12000");
URI uriIntegrityPort = new URI(currentURI.getScheme() + "://" +
currentURI.getHost() + ":12100");
URI uriPrivacyPort = new URI(currentURI.getScheme() +
"://" + currentURI.getHost() + ":12200");
// If connecting to primary port, block token should not include
// handshake secret
byte[] secretOnPrimary = getHandshakeSecret(currentURI, clientConf,
new Path(pathPrefix + "Primary"));
assertTrue(secretOnPrimary == null || secretOnPrimary.length == 0);
// If connecting to auxiliary port, block token should include
// handshake secret
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
byte[] secretPrivacy = getHandshakeSecret(uriPrivacyPort, clientConf,
new Path(pathPrefix + "Privacy"));
assertTrue(secretPrivacy.length > 0);
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
byte[] secretIntegrity = getHandshakeSecret(uriIntegrityPort, clientConf,
new Path(pathPrefix + "Integrity"));
assertTrue(secretIntegrity.length > 0);
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
byte[] secretAuthentication = getHandshakeSecret(uriAuthPort,
clientConf, new Path(pathPrefix + "Authentication"));
assertTrue(secretAuthentication.length > 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private byte[] getHandshakeSecret(URI uri, HdfsConfiguration conf,
Path path) throws Exception {
FileSystem fs = FileSystem.get(uri, conf);
FSDataOutputStream out = fs.create(
path, false, 4096, (short)1, BLOCK_SIZE);
try {
out.write(0);
out.hflush();
Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
final byte[] tokenBytes = token.getIdentifier();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenBytes, tokenBytes.length);
BlockTokenIdentifier blockToken = new BlockTokenIdentifier();
blockToken.readFields(dib);
return blockToken.getHandshakeMsg();
} finally {
out.close();
}
}
/** /**
* Test accessing NameNode from three different ports. * Test accessing NameNode from three different ports.
* *
@ -168,33 +251,54 @@ public void testMultipleNNPortOverwriteDownStream() throws Exception {
clientConf.set(HADOOP_RPC_PROTECTION, "privacy"); clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf); FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
doTest(fsPrivacy, PATH1); doTest(fsPrivacy, PATH1);
// add a wait so that data has reached not only first DN,
// but also the rest
Thread.sleep(100);
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
DataNode dn = dataNodes.get(i); DataNode dn = dataNodes.get(i);
SaslDataTransferClient saslClient = dn.getSaslClient(); SaslDataTransferClient saslClient = dn.getSaslClient();
assertEquals("auth", saslClient.getTargetQOP()); String qop = null;
// It may take some time for the qop to populate
// to all DNs, check in a loop.
for (int trial = 0; trial < 10; trial++) {
qop = saslClient.getTargetQOP();
if (qop != null) {
break;
}
Thread.sleep(100);
}
assertEquals("auth", qop);
} }
clientConf.set(HADOOP_RPC_PROTECTION, "integrity"); clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf); FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
doTest(fsIntegrity, PATH2); doTest(fsIntegrity, PATH2);
Thread.sleep(100);
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
DataNode dn = dataNodes.get(i); DataNode dn = dataNodes.get(i);
SaslDataTransferClient saslClient = dn.getSaslClient(); SaslDataTransferClient saslClient = dn.getSaslClient();
assertEquals("auth", saslClient.getTargetQOP()); String qop = null;
for (int trial = 0; trial < 10; trial++) {
qop = saslClient.getTargetQOP();
if (qop != null) {
break;
}
Thread.sleep(100);
}
assertEquals("auth", qop);
} }
clientConf.set(HADOOP_RPC_PROTECTION, "authentication"); clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf); FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
doTest(fsAuth, PATH3); doTest(fsAuth, PATH3);
Thread.sleep(100);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
DataNode dn = dataNodes.get(i); DataNode dn = dataNodes.get(i);
SaslDataTransferServer saslServer = dn.getSaslServer(); SaslDataTransferServer saslServer = dn.getSaslServer();
assertEquals("auth", saslServer.getNegotiatedQOP()); String qop = null;
for (int trial = 0; trial < 10; trial++) {
qop = saslServer.getNegotiatedQOP();
if (qop != null) {
break;
}
Thread.sleep(100);
}
assertEquals("auth", qop);
} }
} finally { } finally {
if (cluster != null) { if (cluster != null) {