From ce7b8b5634ef84602019cac4ce52337fbe4f9d42 Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Tue, 4 Feb 2020 12:12:35 -0800 Subject: [PATCH] HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang. --- .../java/org/apache/hadoop/ipc/Server.java | 49 +++++-- .../token/block/BlockTokenSecretManager.java | 2 +- .../server/namenode/NameNodeRpcServer.java | 11 -- .../hdfs/TestBlockTokenWrappingQOP.java | 27 +++- .../hadoop/hdfs/TestMultipleNNPortQOP.java | 120 ++++++++++++++++-- 5 files changed, 174 insertions(+), 35 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 7cd3027922d..f4a124adc09 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -382,20 +382,28 @@ public abstract class Server { /** * Returns the SASL qop for the current call, if the current call is - * set, and the SASL negotiation is done. Otherwise return null. Note - * that CurCall is thread local object. So in fact, different handler - * threads will process different CurCall object. + * set, and the SASL negotiation is done. Otherwise return null + * Note this only returns established QOP for auxiliary port, and + * returns null for primary (non-auxiliary) port. + * + * Also note that CurCall is thread local object. So in fact, different + * handler threads will process different CurCall object. * * Also, only return for RPC calls, not supported for other protocols. * @return the QOP of the current connection. */ - public static String getEstablishedQOP() { + public static String getAuxiliaryPortEstablishedQOP() { Call call = CurCall.get(); - if (call == null || !(call instanceof RpcCall)) { + if (!(call instanceof RpcCall)) { return null; } RpcCall rpcCall = (RpcCall)call; - return rpcCall.connection.getEstablishedQOP(); + if (rpcCall.connection.isOnAuxiliaryPort()) { + return rpcCall.connection.getEstablishedQOP(); + } else { + // Not sending back QOP for primary port + return null; + } } /** @@ -1185,7 +1193,8 @@ public abstract class Server { private boolean reuseAddr = conf.getBoolean( CommonConfigurationKeysPublic.IPC_SERVER_REUSEADDR_KEY, CommonConfigurationKeysPublic.IPC_SERVER_REUSEADDR_DEFAULT); - + private boolean isOnAuxiliaryPort; + Listener(int port) throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode @@ -1213,6 +1222,11 @@ public abstract class Server { acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); + this.isOnAuxiliaryPort = false; + } + + void setIsAuxiliary() { + this.isOnAuxiliaryPort = true; } private class Reader extends Thread { @@ -1381,7 +1395,8 @@ public abstract class Server { channel.socket().setKeepAlive(true); Reader reader = getReader(); - Connection c = connectionManager.register(channel, this.listenPort); + Connection c = connectionManager.register(channel, + this.listenPort, this.isOnAuxiliaryPort); // If the connectionManager can't take it, close the connection. if (c == null) { if (channel.isOpen()) { @@ -1805,6 +1820,7 @@ public abstract class Server { private int serviceClass; private boolean shouldClose = false; private int ingressPort; + private boolean isOnAuxiliaryPort; UserGroupInformation user = null; public UserGroupInformation attemptingUser = null; // user name before auth @@ -1817,7 +1833,7 @@ public abstract class Server { private boolean useWrap = false; public Connection(SocketChannel channel, long lastContact, - int ingressPort) { + int ingressPort, boolean isOnAuxiliaryPort) { this.channel = channel; this.lastContact = lastContact; this.data = null; @@ -1830,6 +1846,7 @@ public abstract class Server { this.socket = channel.socket(); this.addr = socket.getInetAddress(); this.ingressPort = ingressPort; + this.isOnAuxiliaryPort = isOnAuxiliaryPort; if (addr == null) { this.hostAddress = "*Unknown*"; } else { @@ -1875,7 +1892,11 @@ public abstract class Server { public String getEstablishedQOP() { return establishedQOP; } - + + public boolean isOnAuxiliaryPort() { + return isOnAuxiliaryPort; + } + public void setLastContact(long lastContact) { this.lastContact = lastContact; } @@ -3113,6 +3134,8 @@ public abstract class Server { "There is already a listener binding to: " + auxiliaryPort); } Listener newListener = new Listener(auxiliaryPort); + newListener.setIsAuxiliary(); + // in the case of port = 0, the listener would be on a != 0 port. LOG.info("Adding a server listener on port " + newListener.getAddress().getPort()); @@ -3732,11 +3755,13 @@ public abstract class Server { return connections.toArray(new Connection[0]); } - Connection register(SocketChannel channel, int ingressPort) { + Connection register(SocketChannel channel, int ingressPort, + boolean isOnAuxiliaryPort) { if (isFull()) { return null; } - Connection connection = new Connection(channel, Time.now(), ingressPort); + Connection connection = new Connection(channel, Time.now(), + ingressPort, isOnAuxiliaryPort); add(connection); if (LOG.isDebugEnabled()) { LOG.debug("Server connection from " + connection + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index a56074a0345..c01ab56ca20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -290,7 +290,7 @@ public class BlockTokenSecretManager extends .getBlockPoolId(), block.getBlockId(), modes, storageTypes, storageIds, useProto); if (shouldWrapQOP) { - String qop = Server.getEstablishedQOP(); + String qop = Server.getAuxiliaryPortEstablishedQOP(); if (qop != null) { id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index e8dace9eb82..b4030164ca4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1900,17 +1900,6 @@ public class NameNodeRpcServer implements NamenodeProtocols { return clientMachine; } - /** - * Return the QOP of the client that the current handler thread - * is handling. Assuming the negotiation is done at this point, - * otherwise returns null. - * - * @return the established QOP of this client. - */ - public static String getEstablishedClientQOP() { - return Server.getEstablishedQOP(); - } - @Override public DataEncryptionKey getDataEncryptionKey() throws IOException { checkNNStartup(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java index 94b80e6485e..c224c4916b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hdfs; +import java.net.URI; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; @@ -77,12 +80,33 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase { @Before public void setup() throws Exception { conf = createSecureConfig(this.configKey); + conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "12000"); + // explicitly setting service rpc for datanode. This because + // DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port + // and service port at the same time, and if no setting for service + // rpc, it would return client port, in this case, it will be the + // auxiliary port for data node. Which is not what auxiliary is for. + // setting service rpc port to avoid this. + conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020"); + conf.set( + CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, + "org.apache.hadoop.security.IngressPortBasedResolver"); + conf.set("ingress.port.sasl.configured.ports", "12000"); + conf.set("ingress.port.sasl.prop.12000", this.configKey); conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true); conf.set(HADOOP_RPC_PROTECTION, this.configKey); cluster = null; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); + + HdfsConfiguration clientConf = new HdfsConfiguration(conf); + clientConf.unset( + CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS); + URI currentURI = cluster.getURI(); + URI uriAuxiliary = new URI(currentURI.getScheme() + + "://" + currentURI.getHost() + ":12000"); + dfs = (DistributedFileSystem) FileSystem.get(uriAuxiliary, conf); } @After @@ -97,7 +121,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase { final String src = "/testAddBlockWrappingQOP"; final Path path = new Path(src); - dfs = cluster.getFileSystem(); dfs.create(path); DFSClient client = dfs.getClient(); @@ -114,7 +137,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase { final String src = "/testAppendWrappingQOP"; final Path path = new Path(src); - dfs = cluster.getFileSystem(); FSDataOutputStream out = dfs.create(path); // NameNode append call returns a last block instance. If there is nothing // it returns as a null. So write something, so that lastBlock has @@ -138,7 +160,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase { final String src = "/testGetBlockLocationWrappingQOP"; final Path path = new Path(src); - dfs = cluster.getFileSystem(); FSDataOutputStream out = dfs.create(path); // if the file is empty, there will be no blocks returned. Write something // so that getBlockLocations actually returns some block. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java index ca8455719b3..db42dcc254e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java @@ -21,13 +21,17 @@ import java.net.URI; import java.util.ArrayList; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.security.token.Token; import org.junit.Before; import org.junit.Test; @@ -77,6 +81,85 @@ public class TestMultipleNNPortQOP extends SaslDataTransferTestCase { clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true); } + /** + * Test that when NameNode returns back its established QOP, + * it only does this for auxiliary port(s), not the primary port. + * + * @throws Exception + */ + @Test + public void testAuxiliaryPortSendingQOP() throws Exception { + MiniDFSCluster cluster = null; + + final String pathPrefix = "/filetestAuxiliaryPortSendingQOP"; + try { + cluster = new MiniDFSCluster.Builder(clusterConf) + .numDataNodes(3).build(); + + cluster.waitActive(); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.unset( + CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS); + + URI currentURI = cluster.getURI(); + URI uriAuthPort = new URI(currentURI.getScheme() + "://" + + currentURI.getHost() + ":12000"); + URI uriIntegrityPort = new URI(currentURI.getScheme() + "://" + + currentURI.getHost() + ":12100"); + URI uriPrivacyPort = new URI(currentURI.getScheme() + + "://" + currentURI.getHost() + ":12200"); + + // If connecting to primary port, block token should not include + // handshake secret + byte[] secretOnPrimary = getHandshakeSecret(currentURI, clientConf, + new Path(pathPrefix + "Primary")); + assertTrue(secretOnPrimary == null || secretOnPrimary.length == 0); + + // If connecting to auxiliary port, block token should include + // handshake secret + clientConf.set(HADOOP_RPC_PROTECTION, "privacy"); + byte[] secretPrivacy = getHandshakeSecret(uriPrivacyPort, clientConf, + new Path(pathPrefix + "Privacy")); + assertTrue(secretPrivacy.length > 0); + + clientConf.set(HADOOP_RPC_PROTECTION, "integrity"); + byte[] secretIntegrity = getHandshakeSecret(uriIntegrityPort, clientConf, + new Path(pathPrefix + "Integrity")); + assertTrue(secretIntegrity.length > 0); + + clientConf.set(HADOOP_RPC_PROTECTION, "authentication"); + byte[] secretAuthentication = getHandshakeSecret(uriAuthPort, + clientConf, new Path(pathPrefix + "Authentication")); + assertTrue(secretAuthentication.length > 0); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private byte[] getHandshakeSecret(URI uri, HdfsConfiguration conf, + Path path) throws Exception { + FileSystem fs = FileSystem.get(uri, conf); + FSDataOutputStream out = fs.create( + path, false, 4096, (short)1, BLOCK_SIZE); + try { + out.write(0); + out.hflush(); + Token token = DFSTestUtil.getBlockToken(out); + final byte[] tokenBytes = token.getIdentifier(); + DataInputBuffer dib = new DataInputBuffer(); + + dib.reset(tokenBytes, tokenBytes.length); + BlockTokenIdentifier blockToken = new BlockTokenIdentifier(); + blockToken.readFields(dib); + return blockToken.getHandshakeMsg(); + } finally { + out.close(); + } + } + /** * Test accessing NameNode from three different ports. * @@ -168,33 +251,54 @@ public class TestMultipleNNPortQOP extends SaslDataTransferTestCase { clientConf.set(HADOOP_RPC_PROTECTION, "privacy"); FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf); doTest(fsPrivacy, PATH1); - // add a wait so that data has reached not only first DN, - // but also the rest - Thread.sleep(100); for (int i = 0; i < 2; i++) { DataNode dn = dataNodes.get(i); SaslDataTransferClient saslClient = dn.getSaslClient(); - assertEquals("auth", saslClient.getTargetQOP()); + String qop = null; + // It may take some time for the qop to populate + // to all DNs, check in a loop. + for (int trial = 0; trial < 10; trial++) { + qop = saslClient.getTargetQOP(); + if (qop != null) { + break; + } + Thread.sleep(100); + } + assertEquals("auth", qop); } clientConf.set(HADOOP_RPC_PROTECTION, "integrity"); FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf); doTest(fsIntegrity, PATH2); - Thread.sleep(100); for (int i = 0; i < 2; i++) { DataNode dn = dataNodes.get(i); SaslDataTransferClient saslClient = dn.getSaslClient(); - assertEquals("auth", saslClient.getTargetQOP()); + String qop = null; + for (int trial = 0; trial < 10; trial++) { + qop = saslClient.getTargetQOP(); + if (qop != null) { + break; + } + Thread.sleep(100); + } + assertEquals("auth", qop); } clientConf.set(HADOOP_RPC_PROTECTION, "authentication"); FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf); doTest(fsAuth, PATH3); - Thread.sleep(100); for (int i = 0; i < 3; i++) { DataNode dn = dataNodes.get(i); SaslDataTransferServer saslServer = dn.getSaslServer(); - assertEquals("auth", saslServer.getNegotiatedQOP()); + String qop = null; + for (int trial = 0; trial < 10; trial++) { + qop = saslServer.getNegotiatedQOP(); + if (qop != null) { + break; + } + Thread.sleep(100); + } + assertEquals("auth", qop); } } finally { if (cluster != null) {