diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java index 35ffb429816..353260f0f9b 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java @@ -20,7 +20,7 @@ package org.apache.hadoop.crypto.key.kms.server; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.log4j.PropertyConfigurator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,8 +104,6 @@ public class KMSConfiguration { public static final boolean KEY_AUTHORIZATION_ENABLE_DEFAULT = true; - private static final String LOG4J_PROPERTIES = "kms-log4j.properties"; - static { Configuration.addDefaultResource(KMS_DEFAULT_XML); Configuration.addDefaultResource(KMS_SITE_XML); @@ -163,31 +161,20 @@ public class KMSConfiguration { return newer; } - public static void initLogging() { - String confDir = System.getProperty(KMS_CONFIG_DIR); - if (confDir == null) { - throw new RuntimeException("System property '" + - KMSConfiguration.KMS_CONFIG_DIR + "' not defined"); + /** + * Validate whether "kms.config.dir" and "log4j.configuration" are defined in the System + * properties. If not, abort the KMS WebServer. + */ + public static void validateSystemProps() { + if (System.getProperty(KMS_CONFIG_DIR) == null) { + String errorMsg = "System property '" + KMS_CONFIG_DIR + "' not defined"; + System.err.println("Aborting KMSWebServer because " + errorMsg); + throw new RuntimeException(errorMsg); } if (System.getProperty("log4j.configuration") == null) { - System.setProperty("log4j.defaultInitOverride", "true"); - boolean fromClasspath = true; - File log4jConf = new File(confDir, LOG4J_PROPERTIES).getAbsoluteFile(); - if (log4jConf.exists()) { - PropertyConfigurator.configureAndWatch(log4jConf.getPath(), 1000); - fromClasspath = false; - } else { - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - URL log4jUrl = cl.getResource(LOG4J_PROPERTIES); - if (log4jUrl != null) { - PropertyConfigurator.configure(log4jUrl); - } - } - LOG.debug("KMS log starting"); - if (fromClasspath) { - LOG.warn("Log4j configuration file '{}' not found", LOG4J_PROPERTIES); - LOG.warn("Logging with INFO level to standard output"); - } + String errorMsg = "System property 'log4j.configuration' not defined"; + System.err.println("Aborting KMSWebServer because " + errorMsg); + throw new RuntimeException(errorMsg); } } } diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebServer.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebServer.java index a6cab81eb8e..5c9f23e9a0c 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebServer.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebServer.java @@ -185,7 +185,7 @@ public class KMSWebServer { } public static void main(String[] args) throws Exception { - KMSConfiguration.initLogging(); + KMSConfiguration.validateSystemProps(); StringUtils.startupShutdownMessage(KMSWebServer.class, args, LOG); Configuration conf = KMSConfiguration.getKMSConf(); Configuration sslConf = SSLFactory.readSSLConfiguration(conf, SSLFactory.Mode.SERVER); diff --git a/hadoop-common-project/hadoop-kms/src/main/libexec/shellprofile.d/hadoop-kms.sh b/hadoop-common-project/hadoop-kms/src/main/libexec/shellprofile.d/hadoop-kms.sh index 0d084bb36e6..b54bf811d6b 100755 --- a/hadoop-common-project/hadoop-kms/src/main/libexec/shellprofile.d/hadoop-kms.sh +++ b/hadoop-common-project/hadoop-kms/src/main/libexec/shellprofile.d/hadoop-kms.sh @@ -49,6 +49,8 @@ function hadoop_subcommand_kms "-Dkms.config.dir=${HADOOP_CONF_DIR}" hadoop_add_param HADOOP_OPTS "-Dkms.log.dir=" \ "-Dkms.log.dir=${HADOOP_LOG_DIR}" + hadoop_add_param HADOOP_OPTS "-Dlog4j.configuration=" \ + "-Dlog4j.configuration=file:${HADOOP_CONF_DIR}/kms-log4j.properties" if [[ "${HADOOP_DAEMON_MODE}" == "default" ]] || [[ "${HADOOP_DAEMON_MODE}" == "start" ]]; then diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 7b664e4f311..a8d80016072 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -224,7 +224,7 @@ public class DFSInputStream extends FSInputStream } /** - * Grab the open-file info from namenode + * Grab the open-file info from namenode. * @param refreshLocatedBlocks whether to re-fetch locatedblocks */ void openInfo(boolean refreshLocatedBlocks) throws IOException { @@ -940,7 +940,8 @@ public class DFSInputStream extends FSInputStream * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is * false. */ - private DNAddrPair chooseDataNode(LocatedBlock block, + @VisibleForTesting + DNAddrPair chooseDataNode(LocatedBlock block, Collection ignoredNodes, boolean refetchIfRequired) throws IOException { while (true) { @@ -955,6 +956,14 @@ public class DFSInputStream extends FSInputStream } } + /** + * RefetchLocations should only be called when there are no active requests + * to datanodes. In the hedged read case this means futures should be empty. + * @param block The locatedBlock to get new datanode locations for. + * @param ignoredNodes A list of ignored nodes. This list can be null and can be cleared. + * @return the locatedBlock with updated datanode locations. + * @throws IOException + */ private LocatedBlock refetchLocations(LocatedBlock block, Collection ignoredNodes) throws IOException { String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), @@ -999,13 +1008,24 @@ public class DFSInputStream extends FSInputStream throw new InterruptedIOException( "Interrupted while choosing DataNode for read."); } - clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId] + clearCachedNodeState(ignoredNodes); openInfo(true); block = refreshLocatedBlock(block); failures++; return block; } + /** + * Clear both the dead nodes and the ignored nodes + * @param ignoredNodes is cleared + */ + private void clearCachedNodeState(Collection ignoredNodes) { + clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId] + if (ignoredNodes != null) { + ignoredNodes.clear(); + } + } + /** * Get the best node from which to stream the data. * @param block LocatedBlock, containing nodes in priority order. @@ -1337,8 +1357,12 @@ public class DFSInputStream extends FSInputStream } catch (InterruptedException ie) { // Ignore and retry } - if (refetch) { - refetchLocations(block, ignored); + // If refetch is true, then all nodes are in deadNodes or ignoredNodes. + // We should loop through all futures and remove them, so we do not + // have concurrent requests to the same node. + // Once all futures are cleared, we can clear the ignoredNodes and retry. + if (refetch && futures.isEmpty()) { + block = refetchLocations(block, ignored); } // We got here if exception. Ignore this node on next go around IFF // we found a chosenNode to hedge read against. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e44a16f029e..0e46dca9dff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4174,7 +4174,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, logAuditEvent(false, operationName, src); throw e; } - if (needLocation && isObserver()) { + if (dl != null && needLocation && isObserver()) { for (HdfsFileStatus fs : dl.getPartialListing()) { if (fs instanceof HdfsLocatedFileStatus) { LocatedBlocks lbs = ((HdfsLocatedFileStatus) fs).getLocatedBlocks(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java index 50378f60381..2e4e496bc3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -35,11 +36,14 @@ import java.util.Map; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.util.Time; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -200,6 +204,25 @@ public class TestDFSInputStreamBlockLocations { testWithRegistrationMethod(DFSInputStream::getAllBlocks); } + /** + * If the ignoreList contains all datanodes, the ignoredList should be cleared to take advantage + * of retries built into chooseDataNode. This is needed for hedged reads + * @throws IOException + */ + @Test + public void testClearIgnoreListChooseDataNode() throws IOException { + final String fileName = "/test_cache_locations"; + filePath = createFile(fileName); + + try (DFSInputStream fin = dfsClient.open(fileName)) { + LocatedBlocks existing = fin.locatedBlocks; + LocatedBlock block = existing.getLastLocatedBlock(); + ArrayList ignoreList = new ArrayList<>(Arrays.asList(block.getLocations())); + Assert.assertNotNull(fin.chooseDataNode(block, ignoreList, true)); + Assert.assertEquals(0, ignoreList.size()); + } + } + @FunctionalInterface interface ThrowingConsumer { void accept(DFSInputStream fin) throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index c1e0dbb8e63..729a7941605 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -603,7 +603,9 @@ public class TestPread { input.read(0, buffer, 0, 1024); Assert.fail("Reading the block should have thrown BlockMissingException"); } catch (BlockMissingException e) { - assertEquals(3, input.getHedgedReadOpsLoopNumForTesting()); + // The result of 9 is due to 2 blocks by 4 iterations plus one because + // hedgedReadOpsLoopNumForTesting is incremented at start of the loop. + assertEquals(9, input.getHedgedReadOpsLoopNumForTesting()); assertTrue(metrics.getHedgedReadOps() == 0); } finally { Mockito.reset(injector); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index d6f42f3d020..b744a6fa586 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -1075,16 +1075,14 @@ public class TestFsDatasetImpl { @Test(timeout = 30000) public void testReportBadBlocks() throws Exception { boolean threwException = false; - MiniDFSCluster cluster = null; - try { - Configuration config = new HdfsConfiguration(); - cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); + final Configuration config = new HdfsConfiguration(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(config) + .numDataNodes(1).build()) { cluster.waitActive(); Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks()); DataNode dataNode = cluster.getDataNodes().get(0); - ExtendedBlock block = - new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), 0); + ExtendedBlock block = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), 0); try { // Test the reportBadBlocks when the volume is null dataNode.reportBadBlocks(block); @@ -1101,15 +1099,11 @@ public class TestFsDatasetImpl { block = DFSTestUtil.getFirstBlock(fs, filePath); // Test for the overloaded method reportBadBlocks - dataNode.reportBadBlocks(block, dataNode.getFSDataset() - .getFsVolumeReferences().get(0)); - Thread.sleep(3000); - BlockManagerTestUtil.updateState(cluster.getNamesystem() - .getBlockManager()); - // Verify the bad block has been reported to namenode - Assert.assertEquals(1, cluster.getNamesystem().getCorruptReplicaBlocks()); - } finally { - cluster.shutdown(); + dataNode.reportBadBlocks(block, dataNode.getFSDataset().getFsVolumeReferences().get(0)); + DataNodeTestUtils.triggerHeartbeat(dataNode); + BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager()); + assertEquals("Corrupt replica blocks could not be reflected with the heartbeat", 1, + cluster.getNamesystem().getCorruptReplicaBlocks()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 8b691a11725..6af7e158fa0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.tools.GetGroups; import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.After; @@ -652,6 +653,17 @@ public class TestObserverNode { } } + @Test + public void testGetListingForDeletedDir() throws Exception { + Path path = new Path("/dir1/dir2/testFile"); + dfs.create(path).close(); + + assertTrue(dfs.delete(new Path("/dir1/dir2"), true)); + + LambdaTestUtils.intercept(FileNotFoundException.class, + () -> dfs.listLocatedStatus(new Path("/dir1/dir2"))); + } + @Test public void testSimpleReadEmptyDirOrFile() throws IOException { // read empty dir