diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 432df34e477..17f57b2c7e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -34,6 +34,7 @@ import static org.apache.hadoop.util.Time.now; import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -187,6 +188,7 @@ import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.ProxyUsers; @@ -2222,6 +2224,24 @@ public class NameNodeRpcServer implements NamenodeProtocols { // guaranteed to have been written by this NameNode.) boolean readInProgress = syncTxid > 0; + // doas the NN login user for the actual operations to get edits. + // Notably this is necessary when polling from the remote edits via https. + // We have validated the client is a superuser from the NN RPC, so this + // running as the login user here is safe. + EventBatchList ret = SecurityUtil.doAsLoginUser( + new PrivilegedExceptionAction() { + @Override + public EventBatchList run() throws IOException { + return getEventBatchList(syncTxid, txid, log, readInProgress, + maxEventsPerRPC); + } + }); + return ret; + } + + private EventBatchList getEventBatchList(long syncTxid, long txid, + FSEditLog log, boolean readInProgress, int maxEventsPerRPC) + throws IOException { List batches = Lists.newArrayList(); int totalEvents = 0; long maxSeenTxid = -1; @@ -2240,7 +2260,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { // and are using QJM -- the edit log will be closed and this exception // will result LOG.info("NN is transitioning from active to standby and FSEditLog " + - "is closed -- could not read edits"); + "is closed -- could not read edits"); return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStreamKerberized.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStreamKerberized.java new file mode 100644 index 00000000000..ace7c3bcdef --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStreamKerberized.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.inotify.EventBatch; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.qjournal.TestSecureNNWithQJM; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.ssl.KeyStoreTestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.security.PrivilegedExceptionAction; +import java.util.Properties; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Class for Kerberized test cases for {@link DFSInotifyEventInputStream}. + */ +public class TestDFSInotifyEventInputStreamKerberized { + + private static final Logger LOG = + LoggerFactory.getLogger(TestDFSInotifyEventInputStreamKerberized.class); + + private File baseDir; + private String keystoresDir; + private String sslConfDir; + + private MiniKdc kdc; + private Configuration baseConf; + private Configuration conf; + private MiniQJMHACluster cluster; + private File generalHDFSKeytabFile; + private File nnKeytabFile; + + @Rule + public Timeout timeout = new Timeout(180000); + + @Test + public void testWithKerberizedCluster() throws Exception { + conf = new HdfsConfiguration(baseConf); + // make sure relogin can happen after tgt expiration. + conf.setInt(HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN, 3); + // make sure the rpc connection is not reused + conf.setInt(IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY, 100); + conf.setInt(IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 2000); + Client.setConnectTimeout(conf, 2000); + // force the remote journal to be the only edits dir, so we can test + // EditLogFileInputStream$URLLog behavior. + cluster = new MiniQJMHACluster.Builder(conf).setForceRemoteEditsOnly(true) + .build(); + cluster.getDfsCluster().waitActive(); + cluster.getDfsCluster().transitionToActive(0); + + final UserGroupInformation ugi = UserGroupInformation + .loginUserFromKeytabAndReturnUGI("hdfs", + generalHDFSKeytabFile.getAbsolutePath()); + + UserGroupInformation.setShouldRenewImmediatelyForTests(true); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + LOG.info("Current user is: " + UserGroupInformation.getCurrentUser() + + " login user is:" + UserGroupInformation.getLoginUser()); + Configuration clientConf = + new Configuration(cluster.getDfsCluster().getConfiguration(0)); + try (DistributedFileSystem clientFs = + (DistributedFileSystem) FileSystem.get(clientConf)) { + clientFs.mkdirs(new Path("/test")); + LOG.info("mkdir /test success"); + final DFSInotifyEventInputStream eis = + clientFs.getInotifyEventStream(); + // verify we can poll + EventBatch batch; + while ((batch = eis.poll()) != null) { + LOG.info("txid: " + batch.getTxid()); + } + assertNull("poll should not return anything", eis.poll()); + + Thread.sleep(6000); + LOG.info("Slept 6 seconds to make sure the TGT has expired."); + + UserGroupInformation.getCurrentUser() + .checkTGTAndReloginFromKeytab(); + clientFs.mkdirs(new Path("/test1")); + LOG.info("mkdir /test1 success"); + + // verify we can poll after a tgt expiration interval + batch = eis.poll(); + assertNotNull("poll should return something", batch); + assertEquals(1, batch.getEvents().length); + assertNull("poll should not return anything", eis.poll()); + return null; + } + } + }); + } + + @Before + public void initKerberizedCluster() throws Exception { + baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"), + TestDFSInotifyEventInputStreamKerberized.class.getSimpleName()); + FileUtil.fullyDelete(baseDir); + assertTrue(baseDir.mkdirs()); + + final Properties kdcConf = MiniKdc.createConf(); + kdcConf.setProperty(MiniKdc.MAX_TICKET_LIFETIME, "5"); + kdcConf.setProperty(MiniKdc.MIN_TICKET_LIFETIME, "5"); + kdc = new MiniKdc(kdcConf, baseDir); + kdc.start(); + + baseConf = new HdfsConfiguration(); + SecurityUtil.setAuthenticationMethod( + UserGroupInformation.AuthenticationMethod.KERBEROS, baseConf); + UserGroupInformation.setConfiguration(baseConf); + assertTrue("Expected configuration to enable security", + UserGroupInformation.isSecurityEnabled()); + + final String userName = "hdfs"; + nnKeytabFile = new File(baseDir, userName + ".keytab"); + final String keytab = nnKeytabFile.getAbsolutePath(); + // Windows will not reverse name lookup "127.0.0.1" to "localhost". + final String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost"; + kdc.createPrincipal(nnKeytabFile, userName + "/" + krbInstance, + "HTTP/" + krbInstance); + generalHDFSKeytabFile = new File(baseDir, "hdfs_general.keytab"); + kdc.createPrincipal(generalHDFSKeytabFile, "hdfs"); + assertTrue(generalHDFSKeytabFile.exists()); + final String hdfsPrincipal = + userName + "/" + krbInstance + "@" + kdc.getRealm(); + final String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm(); + + baseConf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal); + baseConf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab); + baseConf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal); + baseConf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab); + baseConf + .set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal); + baseConf.set(DFS_JOURNALNODE_KEYTAB_FILE_KEY, keytab); + baseConf.set(DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal); + baseConf.set(DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY, + spnegoPrincipal); + baseConf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + baseConf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + baseConf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); + baseConf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); + baseConf.set(DFS_JOURNALNODE_HTTPS_ADDRESS_KEY, "localhost:0"); + baseConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10); + + keystoresDir = baseDir.getAbsolutePath(); + sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSecureNNWithQJM.class); + KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, baseConf, false); + baseConf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY, + KeyStoreTestUtil.getClientSSLConfigFileName()); + baseConf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY, + KeyStoreTestUtil.getServerSSLConfigFileName()); + } + + @After + public void shutdownCluster() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + if (kdc != null) { + kdc.stop(); + } + FileUtil.fullyDelete(baseDir); + KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java index c1638942c85..1005f7fcc09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java @@ -48,6 +48,7 @@ public class MiniQJMHACluster { private StartupOption startOpt = null; private int numNNs = 2; private final MiniDFSCluster.Builder dfsBuilder; + private boolean forceRemoteEditsOnly = false; public Builder(Configuration conf) { this.conf = conf; @@ -72,6 +73,11 @@ public class MiniQJMHACluster { this.numNNs = nns; return this; } + + public Builder setForceRemoteEditsOnly(boolean val) { + this.forceRemoteEditsOnly = val; + return this; + } } public static MiniDFSNNTopology createDefaultTopology(int nns, int startingPort) { @@ -107,7 +113,7 @@ public class MiniQJMHACluster { // start cluster with specified NameNodes MiniDFSNNTopology topology = createDefaultTopology(builder.numNNs, basePort); - initHAConf(journalURI, builder.conf, builder.numNNs, basePort); + initHAConf(journalURI, builder, basePort); // First start up the NNs just to format the namespace. The MinIDFSCluster // has no way to just format the NameNodes without also starting them. @@ -139,14 +145,19 @@ public class MiniQJMHACluster { } } - private Configuration initHAConf(URI journalURI, Configuration conf, - int numNNs, int basePort) { + private Configuration initHAConf(URI journalURI, Builder builder, + int basePort) { conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, journalURI.toString()); + if (builder.forceRemoteEditsOnly) { + conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, journalURI.toString()); + conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, + journalURI.toString()); + } - List nns = new ArrayList(numNNs); + List nns = new ArrayList<>(builder.numNNs); int port = basePort; - for (int i = 0; i < numNNs; i++) { + for (int i = 0; i < builder.numNNs; i++) { nns.add("127.0.0.1:" + port); // increment by 2 each time to account for the http port in the config setting port += 2;