From b97fdb7b4c61d0f553b0a7df6d285726ea45cd62 Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Tue, 4 Feb 2020 12:59:31 -0800 Subject: [PATCH] HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang. --- .../java/org/apache/hadoop/ipc/Server.java | 49 +++++-- .../token/block/BlockTokenSecretManager.java | 2 +- .../server/namenode/NameNodeRpcServer.java | 11 -- .../hdfs/TestBlockTokenWrappingQOP.java | 30 ++++- .../hadoop/hdfs/TestMultipleNNPortQOP.java | 120 ++++++++++++++++-- 5 files changed, 175 insertions(+), 37 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index b7cc6ab48c4..a9ec756154e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -375,20 +375,28 @@ public static InetAddress getRemoteIp() { /** * Returns the SASL qop for the current call, if the current call is - * set, and the SASL negotiation is done. Otherwise return null. Note - * that CurCall is thread local object. So in fact, different handler - * threads will process different CurCall object. + * set, and the SASL negotiation is done. Otherwise return null + * Note this only returns established QOP for auxiliary port, and + * returns null for primary (non-auxiliary) port. + * + * Also note that CurCall is thread local object. So in fact, different + * handler threads will process different CurCall object. * * Also, only return for RPC calls, not supported for other protocols. * @return the QOP of the current connection. */ - public static String getEstablishedQOP() { + public static String getAuxiliaryPortEstablishedQOP() { Call call = CurCall.get(); - if (call == null || !(call instanceof RpcCall)) { + if (!(call instanceof RpcCall)) { return null; } RpcCall rpcCall = (RpcCall)call; - return rpcCall.connection.getEstablishedQOP(); + if (rpcCall.connection.isOnAuxiliaryPort()) { + return rpcCall.connection.getEstablishedQOP(); + } else { + // Not sending back QOP for primary port + return null; + } } /** @@ -1169,7 +1177,8 @@ private class Listener extends Thread { private int backlogLength = conf.getInt( CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); - + private boolean isOnAuxiliaryPort; + Listener(int port) throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode @@ -1196,6 +1205,11 @@ private class Listener extends Thread { acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); + this.isOnAuxiliaryPort = false; + } + + void setIsAuxiliary() { + this.isOnAuxiliaryPort = true; } private class Reader extends Thread { @@ -1364,7 +1378,8 @@ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOf channel.socket().setKeepAlive(true); Reader reader = getReader(); - Connection c = connectionManager.register(channel, this.listenPort); + Connection c = connectionManager.register(channel, + this.listenPort, this.isOnAuxiliaryPort); // If the connectionManager can't take it, close the connection. if (c == null) { if (channel.isOpen()) { @@ -1788,6 +1803,7 @@ public class Connection { private int serviceClass; private boolean shouldClose = false; private int ingressPort; + private boolean isOnAuxiliaryPort; UserGroupInformation user = null; public UserGroupInformation attemptingUser = null; // user name before auth @@ -1800,7 +1816,7 @@ public class Connection { private boolean useWrap = false; public Connection(SocketChannel channel, long lastContact, - int ingressPort) { + int ingressPort, boolean isOnAuxiliaryPort) { this.channel = channel; this.lastContact = lastContact; this.data = null; @@ -1810,6 +1826,7 @@ public Connection(SocketChannel channel, long lastContact, this.socket = channel.socket(); this.addr = socket.getInetAddress(); this.ingressPort = ingressPort; + this.isOnAuxiliaryPort = isOnAuxiliaryPort; if (addr == null) { this.hostAddress = "*Unknown*"; } else { @@ -1855,7 +1872,11 @@ public InetAddress getHostInetAddress() { public String getEstablishedQOP() { return establishedQOP; } - + + public boolean isOnAuxiliaryPort() { + return isOnAuxiliaryPort; + } + public void setLastContact(long lastContact) { this.lastContact = lastContact; } @@ -3016,6 +3037,8 @@ public synchronized void addAuxiliaryListener(int auxiliaryPort) "There is already a listener binding to: " + auxiliaryPort); } Listener newListener = new Listener(auxiliaryPort); + newListener.setIsAuxiliary(); + // in the case of port = 0, the listener would be on a != 0 port. LOG.info("Adding a server listener on port " + newListener.getAddress().getPort()); @@ -3635,11 +3658,13 @@ Connection[] toArray() { return connections.toArray(new Connection[0]); } - Connection register(SocketChannel channel, int ingressPort) { + Connection register(SocketChannel channel, int ingressPort, + boolean isOnAuxiliaryPort) { if (isFull()) { return null; } - Connection connection = new Connection(channel, Time.now(), ingressPort); + Connection connection = new Connection(channel, Time.now(), + ingressPort, isOnAuxiliaryPort); add(connection); if (LOG.isDebugEnabled()) { LOG.debug("Server connection from " + connection + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index dae89c344af..0a3537d312f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -272,7 +272,7 @@ public Token generateToken(String userId, BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block .getBlockPoolId(), block.getBlockId(), modes); if (shouldWrapQOP) { - String qop = Server.getEstablishedQOP(); + String qop = Server.getAuxiliaryPortEstablishedQOP(); if (qop != null) { id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 972f4514bb0..620a017de55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1776,17 +1776,6 @@ private static String getClientMachine() { return clientMachine; } - /** - * Return the QOP of the client that the current handler thread - * is handling. Assuming the negotiation is done at this point, - * otherwise returns null. - * - * @return the established QOP of this client. - */ - public static String getEstablishedClientQOP() { - return Server.getEstablishedQOP(); - } - @Override public DataEncryptionKey getDataEncryptionKey() throws IOException { checkNNStartup(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java index 1a292b04ab9..07b18033b0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hdfs; +import java.net.URI; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; @@ -40,8 +43,7 @@ import org.junit.runners.Parameterized; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -80,12 +82,33 @@ public TestBlockTokenWrappingQOP(String configKey, String qopValue) { @Before public void setup() throws Exception { conf = createSecureConfig(this.configKey); + conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "12000"); + // explicitly setting service rpc for datanode. This because + // DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port + // and service port at the same time, and if no setting for service + // rpc, it would return client port, in this case, it will be the + // auxiliary port for data node. Which is not what auxiliary is for. + // setting service rpc port to avoid this. + conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020"); + conf.set( + CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, + "org.apache.hadoop.security.IngressPortBasedResolver"); + conf.set("ingress.port.sasl.configured.ports", "12000"); + conf.set("ingress.port.sasl.prop.12000", this.configKey); conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true); conf.set(HADOOP_RPC_PROTECTION, this.configKey); cluster = null; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); + + HdfsConfiguration clientConf = new HdfsConfiguration(conf); + clientConf.unset( + CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS); + URI currentURI = cluster.getURI(); + URI uriAuxiliary = new URI(currentURI.getScheme() + + "://" + currentURI.getHost() + ":12000"); + dfs = (DistributedFileSystem) FileSystem.get(uriAuxiliary, conf); } @After @@ -100,7 +123,6 @@ public void testAddBlockWrappingQOP() throws Exception { final String src = "/testAddBlockWrappingQOP"; final Path path = new Path(src); - dfs = cluster.getFileSystem(); dfs.create(path); DFSClient client = dfs.getClient(); @@ -117,7 +139,6 @@ public void testAppendWrappingQOP() throws Exception { final String src = "/testAppendWrappingQOP"; final Path path = new Path(src); - dfs = cluster.getFileSystem(); FSDataOutputStream out = dfs.create(path); // NameNode append call returns a last block instance. If there is nothing // it returns as a null. So write something, so that lastBlock has @@ -141,7 +162,6 @@ public void testGetBlockLocationWrappingQOP() throws Exception { final String src = "/testGetBlockLocationWrappingQOP"; final Path path = new Path(src); - dfs = cluster.getFileSystem(); FSDataOutputStream out = dfs.create(path); // if the file is empty, there will be no blocks returned. Write something // so that getBlockLocations actually returns some block. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java index ca8455719b3..db42dcc254e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java @@ -21,13 +21,17 @@ import java.util.ArrayList; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.security.token.Token; import org.junit.Before; import org.junit.Test; @@ -77,6 +81,85 @@ public void setup() throws Exception { clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true); } + /** + * Test that when NameNode returns back its established QOP, + * it only does this for auxiliary port(s), not the primary port. + * + * @throws Exception + */ + @Test + public void testAuxiliaryPortSendingQOP() throws Exception { + MiniDFSCluster cluster = null; + + final String pathPrefix = "/filetestAuxiliaryPortSendingQOP"; + try { + cluster = new MiniDFSCluster.Builder(clusterConf) + .numDataNodes(3).build(); + + cluster.waitActive(); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.unset( + CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS); + + URI currentURI = cluster.getURI(); + URI uriAuthPort = new URI(currentURI.getScheme() + "://" + + currentURI.getHost() + ":12000"); + URI uriIntegrityPort = new URI(currentURI.getScheme() + "://" + + currentURI.getHost() + ":12100"); + URI uriPrivacyPort = new URI(currentURI.getScheme() + + "://" + currentURI.getHost() + ":12200"); + + // If connecting to primary port, block token should not include + // handshake secret + byte[] secretOnPrimary = getHandshakeSecret(currentURI, clientConf, + new Path(pathPrefix + "Primary")); + assertTrue(secretOnPrimary == null || secretOnPrimary.length == 0); + + // If connecting to auxiliary port, block token should include + // handshake secret + clientConf.set(HADOOP_RPC_PROTECTION, "privacy"); + byte[] secretPrivacy = getHandshakeSecret(uriPrivacyPort, clientConf, + new Path(pathPrefix + "Privacy")); + assertTrue(secretPrivacy.length > 0); + + clientConf.set(HADOOP_RPC_PROTECTION, "integrity"); + byte[] secretIntegrity = getHandshakeSecret(uriIntegrityPort, clientConf, + new Path(pathPrefix + "Integrity")); + assertTrue(secretIntegrity.length > 0); + + clientConf.set(HADOOP_RPC_PROTECTION, "authentication"); + byte[] secretAuthentication = getHandshakeSecret(uriAuthPort, + clientConf, new Path(pathPrefix + "Authentication")); + assertTrue(secretAuthentication.length > 0); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private byte[] getHandshakeSecret(URI uri, HdfsConfiguration conf, + Path path) throws Exception { + FileSystem fs = FileSystem.get(uri, conf); + FSDataOutputStream out = fs.create( + path, false, 4096, (short)1, BLOCK_SIZE); + try { + out.write(0); + out.hflush(); + Token token = DFSTestUtil.getBlockToken(out); + final byte[] tokenBytes = token.getIdentifier(); + DataInputBuffer dib = new DataInputBuffer(); + + dib.reset(tokenBytes, tokenBytes.length); + BlockTokenIdentifier blockToken = new BlockTokenIdentifier(); + blockToken.readFields(dib); + return blockToken.getHandshakeMsg(); + } finally { + out.close(); + } + } + /** * Test accessing NameNode from three different ports. * @@ -168,33 +251,54 @@ public void testMultipleNNPortOverwriteDownStream() throws Exception { clientConf.set(HADOOP_RPC_PROTECTION, "privacy"); FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf); doTest(fsPrivacy, PATH1); - // add a wait so that data has reached not only first DN, - // but also the rest - Thread.sleep(100); for (int i = 0; i < 2; i++) { DataNode dn = dataNodes.get(i); SaslDataTransferClient saslClient = dn.getSaslClient(); - assertEquals("auth", saslClient.getTargetQOP()); + String qop = null; + // It may take some time for the qop to populate + // to all DNs, check in a loop. + for (int trial = 0; trial < 10; trial++) { + qop = saslClient.getTargetQOP(); + if (qop != null) { + break; + } + Thread.sleep(100); + } + assertEquals("auth", qop); } clientConf.set(HADOOP_RPC_PROTECTION, "integrity"); FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf); doTest(fsIntegrity, PATH2); - Thread.sleep(100); for (int i = 0; i < 2; i++) { DataNode dn = dataNodes.get(i); SaslDataTransferClient saslClient = dn.getSaslClient(); - assertEquals("auth", saslClient.getTargetQOP()); + String qop = null; + for (int trial = 0; trial < 10; trial++) { + qop = saslClient.getTargetQOP(); + if (qop != null) { + break; + } + Thread.sleep(100); + } + assertEquals("auth", qop); } clientConf.set(HADOOP_RPC_PROTECTION, "authentication"); FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf); doTest(fsAuth, PATH3); - Thread.sleep(100); for (int i = 0; i < 3; i++) { DataNode dn = dataNodes.get(i); SaslDataTransferServer saslServer = dn.getSaslServer(); - assertEquals("auth", saslServer.getNegotiatedQOP()); + String qop = null; + for (int trial = 0; trial < 10; trial++) { + qop = saslServer.getNegotiatedQOP(); + if (qop != null) { + break; + } + Thread.sleep(100); + } + assertEquals("auth", qop); } } finally { if (cluster != null) {