From cd2501e54b0a27eed55e0b1bdd35bd8a8fd24fd6 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Sun, 28 Mar 2021 16:13:59 +0530 Subject: [PATCH] HDFS-15764. Notify Namenode missing or new block on disk as soon as possible. Contributed by Yang Yun. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 ++ .../hadoop/hdfs/server/datanode/DNConf.java | 4 ++ .../fsdataset/impl/FsDatasetImpl.java | 25 ++++++++ .../src/main/resources/hdfs-default.xml | 8 +++ .../server/datanode/TestDirectoryScanner.java | 46 +++++++++----- .../fsdataset/impl/TestFsDatasetImpl.java | 62 ++++++++++++++++++- 6 files changed, 133 insertions(+), 17 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 23897a502f1..ac2896871e0 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -876,6 +876,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.directoryscan.throttle.limit.ms.per.sec"; public static final int DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = -1; + public static final String + DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY = + "dfs.datanode.directoryscan.max.notify.count"; + public static final long + DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT = 5; public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index b56dd4ec223..7902694d909 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -459,4 +459,8 @@ public class DNConf { public long getProcessCommandsThresholdMs() { return processCommandsThresholdMs; } + + public long getBlockReportInterval() { + return blockReportInterval; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index edf2edcb1b8..41791bb31a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -281,6 +281,11 @@ class FsDatasetImpl implements FsDatasetSpi { final InstrumentedReadWriteLock datasetRWLock; private final Condition datasetWriteLockCondition; private static String blockPoolId = ""; + + // Make limited notify times from DirectoryScanner to NameNode. + private long maxDirScannerNotifyCount; + private long curDirScannerNotifyCount; + private long lastDirScannerNotifyTime; /** * An FSDataset has a directory where it loads its data files. @@ -408,6 +413,10 @@ class FsDatasetImpl implements FsDatasetSpi { maxDataLength = conf.getInt( CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); + maxDirScannerNotifyCount = conf.getLong( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY, + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT); + lastDirScannerNotifyTime = System.currentTimeMillis(); } @Override @@ -2609,6 +2618,11 @@ class FsDatasetImpl implements FsDatasetSpi { Block corruptBlock = null; ReplicaInfo memBlockInfo; long startTimeMs = Time.monotonicNow(); + if (startTimeMs - lastDirScannerNotifyTime > + datanode.getDnConf().getBlockReportInterval()) { + curDirScannerNotifyCount = 0; + lastDirScannerNotifyTime = startTimeMs; + } try (AutoCloseableLock lock = datasetWriteLock.acquire()) { memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && @@ -2661,6 +2675,11 @@ class FsDatasetImpl implements FsDatasetSpi { // Block is in memory and not on the disk // Remove the block from volumeMap volumeMap.remove(bpid, blockId); + if (curDirScannerNotifyCount < maxDirScannerNotifyCount) { + curDirScannerNotifyCount++; + datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, + memBlockInfo), memBlockInfo.getStorageUuid()); + } if (vol.isTransientStorage()) { ramDiskReplicaTracker.discardReplica(bpid, blockId, true); } @@ -2687,6 +2706,12 @@ class FsDatasetImpl implements FsDatasetSpi { .setDirectoryToUse(diskFile.getParentFile()) .build(); volumeMap.add(bpid, diskBlockInfo); + if (curDirScannerNotifyCount < maxDirScannerNotifyCount) { + maxDirScannerNotifyCount++; + datanode.notifyNamenodeReceivedBlock( + new ExtendedBlock(bpid, diskBlockInfo), null, + vol.getStorageID(), vol.isTransientStorage()); + } if (vol.isTransientStorage()) { long lockedBytesReserved = cacheManager.reserve(diskBlockInfo.getNumBytes()) > 0 ? diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 57e3b1bcb1f..c2ae4bc8291 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6191,4 +6191,12 @@ accessed or modified before the specified time interval. + + dfs.datanode.directoryscan.max.notify.count + 5 + + Defines the maximum number of blocks that the DirectoryScanner may notify + namenode right way for received or deleted blocks after one round. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 12b251fbb43..44d99a292b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -249,8 +249,7 @@ public class TestDirectoryScanner { } /** Create a block file in a random volume. */ - private long createBlockFile() throws IOException { - long id = getFreeBlockId(); + private long createBlockFile(long id) throws IOException { try ( FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { int numVolumes = volumes.size(); @@ -282,8 +281,7 @@ public class TestDirectoryScanner { } /** Create block file and corresponding metafile in a rondom volume. */ - private long createBlockMetaFile() throws IOException { - long id = getFreeBlockId(); + private long createBlockMetaFile(long id) throws IOException { try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) { int numVolumes = refs.size(); @@ -548,7 +546,7 @@ public class TestDirectoryScanner { // Test4: A block file exists for which there is no metafile and // a block in memory - blockId = createBlockFile(); + blockId = createBlockFile(blockId); totalBlocks++; scan(totalBlocks, 1, 1, 0, 1, 0); verifyAddition(blockId, HdfsConstants.GRANDFATHER_GENERATION_STAMP, 0); @@ -563,8 +561,12 @@ public class TestDirectoryScanner { scan(totalBlocks, 0, 0, 0, 0, 0); // Test6: A block file and metafile exists for which there is no block in - // memory - blockId = createBlockMetaFile(); + blockId = deleteBlockFile(); + scan(totalBlocks, 1, 0, 1, 0, 0); + totalBlocks--; + verifyDeletion(blockId); + + blockId = createBlockMetaFile(blockId); totalBlocks++; scan(totalBlocks, 1, 0, 0, 1, 0); verifyAddition(blockId, DEFAULT_GEN_STAMP, 0); @@ -577,9 +579,10 @@ public class TestDirectoryScanner { scan(totalBlocks, 10, 10, 0, 0, 10); scan(totalBlocks, 0, 0, 0, 0, 0); - // Test8: Delete bunch of block files + // Test8: Delete bunch of block files and record the ids. + List ids = new ArrayList<>(); for (int i = 0; i < 10; i++) { - blockId = deleteBlockFile(); + ids.add(deleteBlockFile()); } scan(totalBlocks, 10, 0, 10, 0, 0); totalBlocks -= 10; @@ -587,7 +590,7 @@ public class TestDirectoryScanner { // Test9: create a bunch of blocks files for (int i = 0; i < 10; i++) { - blockId = createBlockFile(); + blockId = createBlockFile(ids.get(i)); } totalBlocks += 10; scan(totalBlocks, 10, 10, 0, 10, 0); @@ -601,8 +604,15 @@ public class TestDirectoryScanner { scan(totalBlocks, 0, 0, 0, 0, 0); // Test11: create a bunch block files and meta files + ids.clear(); for (int i = 0; i < 10; i++) { - blockId = createBlockMetaFile(); + ids.add(deleteBlockFile()); + } + scan(totalBlocks, 10, 0, 10, 0, 0); + totalBlocks -= 10; + + for (int i = 0; i < 10; i++) { + blockId = createBlockMetaFile(ids.get(i)); } totalBlocks += 10; scan(totalBlocks, 10, 0, 0, 10, 0); @@ -616,9 +626,16 @@ public class TestDirectoryScanner { scan(totalBlocks, 0, 0, 0, 0, 0); // Test13: all the conditions combined + long blockId1 = deleteBlockFile(); + long blockId2 = deleteBlockFile(); + scan(totalBlocks, 2, 0, 2, 0, 0); + totalBlocks -= 2; + verifyDeletion(blockId1); + verifyDeletion(blockId2); + createMetaFile(); - createBlockFile(); - createBlockMetaFile(); + createBlockFile(blockId1); + createBlockMetaFile(blockId2); deleteMetaFile(); deleteBlockFile(); truncateBlockFile(); @@ -631,9 +648,6 @@ public class TestDirectoryScanner { assertTrue("Report complier threads logged no execution time", scanner.timeRunningMs.get() > 0L); - // Test15: validate clean shutdown of DirectoryScanner - //// assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not - // sim scanner.shutdown(); assertFalse(scanner.getRunStatus()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 8edc977a6bd..6ae6248d3f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import org.apache.hadoop.fs.DF; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; @@ -94,6 +95,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.hamcrest.core.Is.is; @@ -1745,4 +1747,62 @@ public class TestFsDatasetImpl { assertTrue(blockDir.delete()); } } -} + + @Test + public void testNotifyNamenodeMissingOrNewBlock() throws Exception { + long blockSize = 1024; + int heatbeatInterval = 1; + HdfsConfiguration c = new HdfsConfiguration(); + c.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, heatbeatInterval); + c.setLong(DFS_BLOCK_SIZE_KEY, blockSize); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(c). + numDataNodes(1).build(); + try { + cluster.waitActive(); + DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/f1"), + blockSize, (short)1, 0); + String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId(); + DataNode dn = cluster.getDataNodes().get(0); + FsDatasetSpi fsdataset = dn.getFSDataset(); + List replicaInfos = + fsdataset.getSortedFinalizedBlocks(bpid); + assertEquals(1, replicaInfos.size()); + + ReplicaInfo replicaInfo = replicaInfos.get(0); + String blockPath = replicaInfo.getBlockURI().getPath(); + String metaPath = replicaInfo.getMetadataURI().getPath(); + String blockTempPath = blockPath + ".tmp"; + String metaTempPath = metaPath + ".tmp"; + File blockFile = new File(blockPath); + File blockTempFile = new File(blockTempPath); + File metaFile = new File(metaPath); + File metaTempFile = new File(metaTempPath); + + // remove block and meta file of the block + blockFile.renameTo(blockTempFile); + metaFile.renameTo(metaTempFile); + assertFalse(blockFile.exists()); + assertFalse(metaFile.exists()); + + FsVolumeSpi.ScanInfo info = new FsVolumeSpi.ScanInfo( + replicaInfo.getBlockId(), blockFile.getAbsoluteFile(), + metaFile.getAbsoluteFile(), replicaInfo.getVolume()); + fsdataset.checkAndUpdate(bpid, info); + + BlockManager blockManager = cluster.getNameNode(). + getNamesystem().getBlockManager(); + GenericTestUtils.waitFor(() -> + blockManager.getLowRedundancyBlocksCount() == 1, 100, 5000); + + // move the block and meta file back + blockTempFile.renameTo(blockFile); + metaTempFile.renameTo(metaFile); + + fsdataset.checkAndUpdate(bpid, info); + GenericTestUtils.waitFor(() -> + blockManager.getLowRedundancyBlocksCount() == 0, 100, 5000); + } finally { + cluster.shutdown(); + } + } +} \ No newline at end of file