Merge branch 'apache:trunk' into YARN-11093

This commit is contained in:
Shailesh Gupta 2023-03-02 14:05:29 +05:30 committed by GitHub
commit 837cc9ba41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 93 additions and 49 deletions

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.crypto.key.kms.server;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -104,8 +104,6 @@ public class KMSConfiguration {
public static final boolean KEY_AUTHORIZATION_ENABLE_DEFAULT = true; public static final boolean KEY_AUTHORIZATION_ENABLE_DEFAULT = true;
private static final String LOG4J_PROPERTIES = "kms-log4j.properties";
static { static {
Configuration.addDefaultResource(KMS_DEFAULT_XML); Configuration.addDefaultResource(KMS_DEFAULT_XML);
Configuration.addDefaultResource(KMS_SITE_XML); Configuration.addDefaultResource(KMS_SITE_XML);
@ -163,31 +161,20 @@ public class KMSConfiguration {
return newer; return newer;
} }
public static void initLogging() { /**
String confDir = System.getProperty(KMS_CONFIG_DIR); * Validate whether "kms.config.dir" and "log4j.configuration" are defined in the System
if (confDir == null) { * properties. If not, abort the KMS WebServer.
throw new RuntimeException("System property '" + */
KMSConfiguration.KMS_CONFIG_DIR + "' not defined"); public static void validateSystemProps() {
if (System.getProperty(KMS_CONFIG_DIR) == null) {
String errorMsg = "System property '" + KMS_CONFIG_DIR + "' not defined";
System.err.println("Aborting KMSWebServer because " + errorMsg);
throw new RuntimeException(errorMsg);
} }
if (System.getProperty("log4j.configuration") == null) { if (System.getProperty("log4j.configuration") == null) {
System.setProperty("log4j.defaultInitOverride", "true"); String errorMsg = "System property 'log4j.configuration' not defined";
boolean fromClasspath = true; System.err.println("Aborting KMSWebServer because " + errorMsg);
File log4jConf = new File(confDir, LOG4J_PROPERTIES).getAbsoluteFile(); throw new RuntimeException(errorMsg);
if (log4jConf.exists()) {
PropertyConfigurator.configureAndWatch(log4jConf.getPath(), 1000);
fromClasspath = false;
} else {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
URL log4jUrl = cl.getResource(LOG4J_PROPERTIES);
if (log4jUrl != null) {
PropertyConfigurator.configure(log4jUrl);
}
}
LOG.debug("KMS log starting");
if (fromClasspath) {
LOG.warn("Log4j configuration file '{}' not found", LOG4J_PROPERTIES);
LOG.warn("Logging with INFO level to standard output");
}
} }
} }
} }

View File

@ -185,7 +185,7 @@ public class KMSWebServer {
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
KMSConfiguration.initLogging(); KMSConfiguration.validateSystemProps();
StringUtils.startupShutdownMessage(KMSWebServer.class, args, LOG); StringUtils.startupShutdownMessage(KMSWebServer.class, args, LOG);
Configuration conf = KMSConfiguration.getKMSConf(); Configuration conf = KMSConfiguration.getKMSConf();
Configuration sslConf = SSLFactory.readSSLConfiguration(conf, SSLFactory.Mode.SERVER); Configuration sslConf = SSLFactory.readSSLConfiguration(conf, SSLFactory.Mode.SERVER);

View File

@ -49,6 +49,8 @@ function hadoop_subcommand_kms
"-Dkms.config.dir=${HADOOP_CONF_DIR}" "-Dkms.config.dir=${HADOOP_CONF_DIR}"
hadoop_add_param HADOOP_OPTS "-Dkms.log.dir=" \ hadoop_add_param HADOOP_OPTS "-Dkms.log.dir=" \
"-Dkms.log.dir=${HADOOP_LOG_DIR}" "-Dkms.log.dir=${HADOOP_LOG_DIR}"
hadoop_add_param HADOOP_OPTS "-Dlog4j.configuration=" \
"-Dlog4j.configuration=file:${HADOOP_CONF_DIR}/kms-log4j.properties"
if [[ "${HADOOP_DAEMON_MODE}" == "default" ]] || if [[ "${HADOOP_DAEMON_MODE}" == "default" ]] ||
[[ "${HADOOP_DAEMON_MODE}" == "start" ]]; then [[ "${HADOOP_DAEMON_MODE}" == "start" ]]; then

View File

@ -224,7 +224,7 @@ public class DFSInputStream extends FSInputStream
} }
/** /**
* Grab the open-file info from namenode * Grab the open-file info from namenode.
* @param refreshLocatedBlocks whether to re-fetch locatedblocks * @param refreshLocatedBlocks whether to re-fetch locatedblocks
*/ */
void openInfo(boolean refreshLocatedBlocks) throws IOException { void openInfo(boolean refreshLocatedBlocks) throws IOException {
@ -940,7 +940,8 @@ public class DFSInputStream extends FSInputStream
* @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
* false. * false.
*/ */
private DNAddrPair chooseDataNode(LocatedBlock block, @VisibleForTesting
DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired) Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
throws IOException { throws IOException {
while (true) { while (true) {
@ -955,6 +956,14 @@ public class DFSInputStream extends FSInputStream
} }
} }
/**
* RefetchLocations should only be called when there are no active requests
* to datanodes. In the hedged read case this means futures should be empty.
* @param block The locatedBlock to get new datanode locations for.
* @param ignoredNodes A list of ignored nodes. This list can be null and can be cleared.
* @return the locatedBlock with updated datanode locations.
* @throws IOException
*/
private LocatedBlock refetchLocations(LocatedBlock block, private LocatedBlock refetchLocations(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException { Collection<DatanodeInfo> ignoredNodes) throws IOException {
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
@ -999,13 +1008,24 @@ public class DFSInputStream extends FSInputStream
throw new InterruptedIOException( throw new InterruptedIOException(
"Interrupted while choosing DataNode for read."); "Interrupted while choosing DataNode for read.");
} }
clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId] clearCachedNodeState(ignoredNodes);
openInfo(true); openInfo(true);
block = refreshLocatedBlock(block); block = refreshLocatedBlock(block);
failures++; failures++;
return block; return block;
} }
/**
* Clear both the dead nodes and the ignored nodes
* @param ignoredNodes is cleared
*/
private void clearCachedNodeState(Collection<DatanodeInfo> ignoredNodes) {
clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
if (ignoredNodes != null) {
ignoredNodes.clear();
}
}
/** /**
* Get the best node from which to stream the data. * Get the best node from which to stream the data.
* @param block LocatedBlock, containing nodes in priority order. * @param block LocatedBlock, containing nodes in priority order.
@ -1337,8 +1357,12 @@ public class DFSInputStream extends FSInputStream
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// Ignore and retry // Ignore and retry
} }
if (refetch) { // If refetch is true, then all nodes are in deadNodes or ignoredNodes.
refetchLocations(block, ignored); // We should loop through all futures and remove them, so we do not
// have concurrent requests to the same node.
// Once all futures are cleared, we can clear the ignoredNodes and retry.
if (refetch && futures.isEmpty()) {
block = refetchLocations(block, ignored);
} }
// We got here if exception. Ignore this node on next go around IFF // We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against. // we found a chosenNode to hedge read against.

View File

@ -4174,7 +4174,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
logAuditEvent(false, operationName, src); logAuditEvent(false, operationName, src);
throw e; throw e;
} }
if (needLocation && isObserver()) { if (dl != null && needLocation && isObserver()) {
for (HdfsFileStatus fs : dl.getPartialListing()) { for (HdfsFileStatus fs : dl.getPartialListing()) {
if (fs instanceof HdfsLocatedFileStatus) { if (fs instanceof HdfsLocatedFileStatus) {
LocatedBlocks lbs = ((HdfsLocatedFileStatus) fs).getLocatedBlocks(); LocatedBlocks lbs = ((HdfsLocatedFileStatus) fs).getLocatedBlocks();

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@ -35,11 +36,14 @@ import java.util.Map;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -200,6 +204,25 @@ public class TestDFSInputStreamBlockLocations {
testWithRegistrationMethod(DFSInputStream::getAllBlocks); testWithRegistrationMethod(DFSInputStream::getAllBlocks);
} }
/**
* If the ignoreList contains all datanodes, the ignoredList should be cleared to take advantage
* of retries built into chooseDataNode. This is needed for hedged reads
* @throws IOException
*/
@Test
public void testClearIgnoreListChooseDataNode() throws IOException {
final String fileName = "/test_cache_locations";
filePath = createFile(fileName);
try (DFSInputStream fin = dfsClient.open(fileName)) {
LocatedBlocks existing = fin.locatedBlocks;
LocatedBlock block = existing.getLastLocatedBlock();
ArrayList<DatanodeInfo> ignoreList = new ArrayList<>(Arrays.asList(block.getLocations()));
Assert.assertNotNull(fin.chooseDataNode(block, ignoreList, true));
Assert.assertEquals(0, ignoreList.size());
}
}
@FunctionalInterface @FunctionalInterface
interface ThrowingConsumer { interface ThrowingConsumer {
void accept(DFSInputStream fin) throws IOException; void accept(DFSInputStream fin) throws IOException;

View File

@ -603,7 +603,9 @@ public class TestPread {
input.read(0, buffer, 0, 1024); input.read(0, buffer, 0, 1024);
Assert.fail("Reading the block should have thrown BlockMissingException"); Assert.fail("Reading the block should have thrown BlockMissingException");
} catch (BlockMissingException e) { } catch (BlockMissingException e) {
assertEquals(3, input.getHedgedReadOpsLoopNumForTesting()); // The result of 9 is due to 2 blocks by 4 iterations plus one because
// hedgedReadOpsLoopNumForTesting is incremented at start of the loop.
assertEquals(9, input.getHedgedReadOpsLoopNumForTesting());
assertTrue(metrics.getHedgedReadOps() == 0); assertTrue(metrics.getHedgedReadOps() == 0);
} finally { } finally {
Mockito.reset(injector); Mockito.reset(injector);

View File

@ -1075,16 +1075,14 @@ public class TestFsDatasetImpl {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testReportBadBlocks() throws Exception { public void testReportBadBlocks() throws Exception {
boolean threwException = false; boolean threwException = false;
MiniDFSCluster cluster = null; final Configuration config = new HdfsConfiguration();
try { try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
Configuration config = new HdfsConfiguration(); .numDataNodes(1).build()) {
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();
Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks()); Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks());
DataNode dataNode = cluster.getDataNodes().get(0); DataNode dataNode = cluster.getDataNodes().get(0);
ExtendedBlock block = ExtendedBlock block = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), 0);
new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), 0);
try { try {
// Test the reportBadBlocks when the volume is null // Test the reportBadBlocks when the volume is null
dataNode.reportBadBlocks(block); dataNode.reportBadBlocks(block);
@ -1101,15 +1099,11 @@ public class TestFsDatasetImpl {
block = DFSTestUtil.getFirstBlock(fs, filePath); block = DFSTestUtil.getFirstBlock(fs, filePath);
// Test for the overloaded method reportBadBlocks // Test for the overloaded method reportBadBlocks
dataNode.reportBadBlocks(block, dataNode.getFSDataset() dataNode.reportBadBlocks(block, dataNode.getFSDataset().getFsVolumeReferences().get(0));
.getFsVolumeReferences().get(0)); DataNodeTestUtils.triggerHeartbeat(dataNode);
Thread.sleep(3000); BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager());
BlockManagerTestUtil.updateState(cluster.getNamesystem() assertEquals("Corrupt replica blocks could not be reflected with the heartbeat", 1,
.getBlockManager()); cluster.getNamesystem().getCorruptReplicaBlocks());
// Verify the bad block has been reported to namenode
Assert.assertEquals(1, cluster.getNamesystem().getCorruptReplicaBlocks());
} finally {
cluster.shutdown();
} }
} }

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After; import org.junit.After;
@ -652,6 +653,17 @@ public class TestObserverNode {
} }
} }
@Test
public void testGetListingForDeletedDir() throws Exception {
Path path = new Path("/dir1/dir2/testFile");
dfs.create(path).close();
assertTrue(dfs.delete(new Path("/dir1/dir2"), true));
LambdaTestUtils.intercept(FileNotFoundException.class,
() -> dfs.listLocatedStatus(new Path("/dir1/dir2")));
}
@Test @Test
public void testSimpleReadEmptyDirOrFile() throws IOException { public void testSimpleReadEmptyDirOrFile() throws IOException {
// read empty dir // read empty dir