From b5e7f59e5362eb7c133d3bfcf4e1166930dda00f Mon Sep 17 00:00:00 2001 From: Xiangyi Zhu <82511136+zhuxiangyi@users.noreply.github.com> Date: Sat, 15 Jan 2022 23:18:05 +0800 Subject: [PATCH] HDFS-16043. Add markedDeleteBlockScrubberThread to delete blocks asynchronously (#3882). Contributed by Xiangyi Zhu. Signed-off-by: He Xiaoqiao --- .../server/blockmanagement/BlockManager.java | 105 +++++++++++++++++- .../hdfs/server/namenode/FSNamesystem.java | 45 ++------ .../namenode/metrics/NameNodeMetrics.java | 16 +++ .../hdfs/TestBlocksScheduledCounter.java | 2 + .../org/apache/hadoop/hdfs/TestDFSRename.java | 3 + .../apache/hadoop/hdfs/TestFileCreation.java | 5 + .../hdfs/TestReadStripedFileWithDecoding.java | 5 +- .../blockmanagement/BlockManagerTestUtil.java | 18 ++- .../TestComputeInvalidateWork.java | 2 + .../impl/TestLazyPersistLockedMemory.java | 3 + .../namenode/TestDecommissioningStatus.java | 2 + .../server/namenode/TestFileTruncate.java | 47 ++++---- .../namenode/TestLargeDirectoryDelete.java | 3 + .../hdfs/server/namenode/TestMetaSave.java | 2 + .../server/namenode/TestNameNodeMXBean.java | 39 +++++-- .../server/namenode/ha/TestHASafeMode.java | 4 + .../namenode/metrics/TestNameNodeMetrics.java | 2 + .../snapshot/TestSnapshotDeletion.java | 9 +- 18 files changed, 242 insertions(+), 70 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d1829cdbbed..48ac5ed6ead 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import javax.management.ObjectName; @@ -190,6 +191,9 @@ public class BlockManager implements BlockStatsMXBean { private volatile long lowRedundancyBlocksCount = 0L; private volatile long scheduledReplicationBlocksCount = 0L; + private final long deleteBlockLockTimeMs = 500; + private final long deleteBlockUnlockIntervalTimeMs = 100; + /** flag indicating whether replication queues have been initialized */ private boolean initializedReplQueues; @@ -323,6 +327,12 @@ public class BlockManager implements BlockStatsMXBean { * {@link #redundancyThread} has run at least one full iteration. */ private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1); + /** + * markedDeleteBlockScrubber thread for handling async delete blocks. + */ + private final Daemon markedDeleteBlockScrubberThread = + new Daemon(new MarkedDeleteBlockScrubber()); + /** Block report thread for handling async reports. */ private final BlockReportProcessingThread blockReportThread; @@ -421,6 +431,12 @@ public class BlockManager implements BlockStatsMXBean { */ private int numBlocksPerIteration; + /** + * The blocks of deleted files are put into the queue, + * and the cleanup thread processes these blocks periodically. + */ + private final ConcurrentLinkedQueue> markedDeleteQueue; + /** * Progress of the Reconstruction queues initialisation. */ @@ -474,7 +490,7 @@ public class BlockManager implements BlockStatsMXBean { datanodeManager.getBlockInvalidateLimit(), startupDelayBlockDeletionInMs, blockIdManager); - + markedDeleteQueue = new ConcurrentLinkedQueue<>(); // Compute the map capacity by allocating 2% of total memory blocksMap = new BlocksMap( LightWeightGSet.computeCapacity(2.0, "BlocksMap")); @@ -724,6 +740,9 @@ public class BlockManager implements BlockStatsMXBean { datanodeManager.activate(conf); this.redundancyThread.setName("RedundancyMonitor"); this.redundancyThread.start(); + this.markedDeleteBlockScrubberThread. + setName("MarkedDeleteBlockScrubberThread"); + this.markedDeleteBlockScrubberThread.start(); this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); @@ -737,8 +756,10 @@ public class BlockManager implements BlockStatsMXBean { try { redundancyThread.interrupt(); blockReportThread.interrupt(); + markedDeleteBlockScrubberThread.interrupt(); redundancyThread.join(3000); blockReportThread.join(3000); + markedDeleteBlockScrubberThread.join(3000); } catch (InterruptedException ie) { } datanodeManager.close(); @@ -4877,6 +4898,77 @@ public class BlockManager implements BlockStatsMXBean { return lastRedundancyCycleTS.get(); } + /** + * Periodically deletes the marked block. + */ + private class MarkedDeleteBlockScrubber implements Runnable { + private Iterator toDeleteIterator = null; + private boolean isSleep; + private NameNodeMetrics metrics; + + private void remove(long time) { + if (checkToDeleteIterator()) { + namesystem.writeLock(); + try { + while (toDeleteIterator.hasNext()) { + removeBlock(toDeleteIterator.next()); + metrics.decrPendingDeleteBlocksCount(); + if (Time.monotonicNow() - time > deleteBlockLockTimeMs) { + isSleep = true; + break; + } + } + } finally { + namesystem.writeUnlock(); + } + } + } + + private boolean checkToDeleteIterator() { + return toDeleteIterator != null && toDeleteIterator.hasNext(); + } + + @Override + public void run() { + LOG.info("Start MarkedDeleteBlockScrubber thread"); + while (namesystem.isRunning() && + !Thread.currentThread().isInterrupted()) { + if (!markedDeleteQueue.isEmpty() || checkToDeleteIterator()) { + try { + metrics = NameNode.getNameNodeMetrics(); + metrics.setDeleteBlocksQueued(markedDeleteQueue.size()); + isSleep = false; + long startTime = Time.monotonicNow(); + remove(startTime); + while (!isSleep && !markedDeleteQueue.isEmpty() && + !Thread.currentThread().isInterrupted()) { + List markedDeleteList = markedDeleteQueue.poll(); + if (markedDeleteList != null) { + toDeleteIterator = markedDeleteList.listIterator(); + } + remove(startTime); + } + } catch (Exception e){ + LOG.warn("MarkedDeleteBlockScrubber encountered an exception" + + " during the block deletion process, " + + " the deletion of the block will retry in {} millisecond.", + deleteBlockUnlockIntervalTimeMs, e); + } + } + if (isSleep) { + LOG.debug("Clear markedDeleteQueue over {}" + + " millisecond to release the write lock", deleteBlockLockTimeMs); + } + try { + Thread.sleep(deleteBlockUnlockIntervalTimeMs); + } catch (InterruptedException e) { + LOG.info("Stopping MarkedDeleteBlockScrubber."); + break; + } + } + } + } + /** * Periodically calls computeBlockRecoveryWork(). */ @@ -5223,6 +5315,17 @@ public class BlockManager implements BlockStatsMXBean { return blockIdManager; } + @VisibleForTesting + public ConcurrentLinkedQueue> getMarkedDeleteQueue() { + return markedDeleteQueue; + } + + public void addBLocksToMarkedDeleteQueue(List blockInfos) { + markedDeleteQueue.add(blockInfos); + NameNode.getNameNodeMetrics(). + incrPendingDeleteBlocksCount(blockInfos.size()); + } + public long nextGenerationStamp(boolean legacyBlock) throws IOException { return blockIdManager.nextGenerationStamp(legacyBlock); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8daa474bab2..243f62295ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2277,8 +2277,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } getEditLog().logSync(); if (!toRemoveBlocks.getToDeleteList().isEmpty()) { - removeBlocks(toRemoveBlocks); - toRemoveBlocks.clear(); + blockManager.addBLocksToMarkedDeleteQueue( + toRemoveBlocks.getToDeleteList()); } logAuditEvent(true, operationName, src, null, r.getFileStatus()); } catch (AccessControlException e) { @@ -2717,8 +2717,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (!skipSync) { getEditLog().logSync(); if (toRemoveBlocks != null) { - removeBlocks(toRemoveBlocks); - toRemoveBlocks.clear(); + blockManager.addBLocksToMarkedDeleteQueue( + toRemoveBlocks.getToDeleteList()); } } } @@ -3236,8 +3236,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, BlocksMapUpdateInfo collectedBlocks = res.collectedBlocks; if (!collectedBlocks.getToDeleteList().isEmpty()) { - removeBlocks(collectedBlocks); - collectedBlocks.clear(); + blockManager.addBLocksToMarkedDeleteQueue( + collectedBlocks.getToDeleteList()); } logAuditEvent(true, operationName + " (options=" + @@ -3276,7 +3276,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, getEditLog().logSync(); logAuditEvent(ret, operationName, src); if (toRemovedBlocks != null) { - removeBlocks(toRemovedBlocks); // Incremental deletion of blocks + blockManager.addBLocksToMarkedDeleteQueue( + toRemovedBlocks.getToDeleteList()); } return ret; } @@ -3286,30 +3287,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return dir.getPermissionChecker(); } - /** - * From the given list, incrementally remove the blocks from blockManager - * Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to - * ensure that other waiters on the lock can get in. See HDFS-2938 - * - * @param blocks - * An instance of {@link BlocksMapUpdateInfo} which contains a list - * of blocks that need to be removed from blocksMap - */ - void removeBlocks(BlocksMapUpdateInfo blocks) { - List toDeleteList = blocks.getToDeleteList(); - Iterator iter = toDeleteList.iterator(); - while (iter.hasNext()) { - writeLock(); - try { - for (int i = 0; i < blockDeletionIncrement && iter.hasNext(); i++) { - blockManager.removeBlock(iter.next()); - } - } finally { - writeUnlock("removeBlocks"); - } - } - } - /** * Remove leases and inodes related to a given path * @param removedUCFiles INodes whose leases need to be released @@ -4508,7 +4485,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, INodesInPath.fromINode((INodeFile) bc), false); changed |= toRemoveBlocks != null; if (toRemoveBlocks != null) { - removeBlocks(toRemoveBlocks); // Incremental deletion of blocks + blockManager.addBLocksToMarkedDeleteQueue( + toRemoveBlocks.getToDeleteList()); } } } finally { @@ -7170,7 +7148,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // Breaking the pattern as removing blocks have to happen outside of the // global lock if (blocksToBeDeleted != null) { - removeBlocks(blocksToBeDeleted); + blockManager.addBLocksToMarkedDeleteQueue( + blocksToBeDeleted.getToDeleteList()); } logAuditEvent(true, operationName, rootPath, null, null); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java index 4df9b5d6575..5e318277c87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java @@ -87,6 +87,10 @@ public class NameNodeMetrics { MutableCounterLong blockOpsBatched; @Metric("Number of pending edits") MutableGaugeInt pendingEditsCount; + @Metric("Number of delete blocks Queued") + MutableGaugeInt deleteBlocksQueued; + @Metric("Number of pending deletion blocks") + MutableGaugeInt pendingDeleteBlocksCount; @Metric("Number of file system operations") public long totalFileOps(){ @@ -334,6 +338,18 @@ public class NameNodeMetrics { blockOpsQueued.set(size); } + public void setDeleteBlocksQueued(int size) { + deleteBlocksQueued.set(size); + } + + public void incrPendingDeleteBlocksCount(int size) { + pendingDeleteBlocksCount.incr(size); + } + + public void decrPendingDeleteBlocksCount() { + pendingDeleteBlocksCount.decr(); + } + public void addBlockOpsBatched(int count) { blockOpsBatched.incr(count); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java index 95d6825d297..d86700b39b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java @@ -190,6 +190,8 @@ public class TestBlocksScheduledCounter { // 4. delete the file dfs.delete(filePath, true); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); int blocksScheduled = 0; for (DatanodeDescriptor descriptor : dnList) { if (descriptor.getBlocksScheduled() != 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRename.java index fe2eee28b75..427dc43d3bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRename.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.test.GenericTestUtils; @@ -161,6 +162,8 @@ public class TestDFSRename { assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock(). getLocalBlock()) != null); dfs.rename(srcPath, dstPath, Rename.OVERWRITE); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock(). getLocalBlock()) == null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 93687b680a8..ae566fcc842 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; @@ -1350,6 +1351,8 @@ public class TestFileCreation { assertBlocks(bm, oldBlocks, true); out = dfs.create(filePath, true); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); byte[] newData = AppendTestUtil.randomBytes(seed, fileSize); try { out.write(newData); @@ -1357,6 +1360,8 @@ public class TestFileCreation { out.close(); } dfs.deleteOnExit(filePath); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); LocatedBlocks newBlocks = NameNodeAdapter.getBlockLocations( nn, file, 0, fileSize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java index 2fb9212f354..132eb611a2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; @@ -127,7 +128,7 @@ public class TestReadStripedFileWithDecoding { } @Test - public void testInvalidateBlock() throws IOException { + public void testInvalidateBlock() throws IOException, InterruptedException { final Path file = new Path("/invalidate"); final int length = 10; final byte[] bytes = StripedFileTestUtil.generateBytes(length); @@ -151,6 +152,8 @@ public class TestReadStripedFileWithDecoding { try { // delete the file dfs.delete(file, true); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem().getBlockManager()); // check the block is added to invalidateBlocks final FSNamesystem fsn = cluster.getNamesystem(); final BlockManager bm = fsn.getBlockManager(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 2b8804c12cc..433877aa706 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -39,6 +39,9 @@ import org.junit.Assert; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; public class BlockManagerTestUtil { + + static final long SLEEP_TIME = 1000; + public static void setNodeReplicationLimit(final BlockManager blockManager, final int limit) { blockManager.maxReplicationStreams = limit; @@ -178,7 +181,20 @@ public class BlockManagerTestUtil { */ public static CorruptReplicasMap getCorruptReplicas(final BlockManager blockManager){ return blockManager.corruptReplicas; - + + } + + /** + * Wait for the processing of the marked deleted block to complete. + */ + public static void waitForMarkedDeleteQueueIsEmpty( + BlockManager blockManager) throws InterruptedException { + while (true) { + if (blockManager.getMarkedDeleteQueue().isEmpty()) { + return; + } + Thread.sleep(SLEEP_TIME); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java index d7920a75c13..4ae0316fa7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java @@ -253,6 +253,8 @@ public class TestComputeInvalidateWork { } dfs.delete(path, false); dfs.delete(ecFile, false); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); namesystem.writeLock(); InvalidateBlocks invalidateBlocks; int totalStripedDataBlocks = totalBlockGroups * (ecPolicy.getNumDataUnits() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java index 2d54c480461..699854cd175 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.test.GenericTestUtils; @@ -175,6 +176,8 @@ public class TestLazyPersistLockedMemory extends LazyPersistTestCase { // Delete the file and ensure locked RAM goes to zero. fs.delete(path, false); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); waitForLockedBytesUsed(fsd, 0); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index e8bd8377a3c..420aa8c1af7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -445,6 +445,8 @@ public class TestDecommissioningStatus { // Delete the under-replicated file, which should let the // DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED AdminStatesBaseTest.cleanupFile(fileSys, f); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); BlockManagerTestUtil.recheckDecommissionState(dm); // Block until the admin's monitor updates the number of tracked nodes. waitForDecommissionedNodes(dm.getDatanodeAdminManager(), 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java index 5c75abb0e69..9cff6f8d274 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java @@ -33,6 +33,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.LambdaTestUtils; @@ -321,7 +322,8 @@ public class TestFileTruncate { } @Test - public void testSnapshotWithAppendTruncate() throws IOException { + public void testSnapshotWithAppendTruncate() + throws IOException, InterruptedException { testSnapshotWithAppendTruncate(0, 1, 2); testSnapshotWithAppendTruncate(0, 2, 1); testSnapshotWithAppendTruncate(1, 0, 2); @@ -335,7 +337,8 @@ public class TestFileTruncate { * Delete snapshots in the specified order and verify that * remaining snapshots are still readable. */ - void testSnapshotWithAppendTruncate(int ... deleteOrder) throws IOException { + void testSnapshotWithAppendTruncate(int... deleteOrder) + throws IOException, InterruptedException { FSDirectory fsDir = cluster.getNamesystem().getFSDirectory(); fs.mkdirs(parent); fs.setQuota(parent, 100, 1000); @@ -383,16 +386,16 @@ public class TestFileTruncate { // Truncate to block boundary int newLength = length[0] + BLOCK_SIZE / 2; boolean isReady = fs.truncate(src, newLength); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); assertTrue("Recovery is not expected.", isReady); assertFileLength(snapshotFiles[2], length[2]); assertFileLength(snapshotFiles[1], length[1]); assertFileLength(snapshotFiles[0], length[0]); assertBlockNotPresent(appendedBlk); - // Diskspace consumed should be 16 bytes * 3. [blk 1,2,3 SS:4] contentSummary = fs.getContentSummary(parent); assertThat(contentSummary.getSpaceConsumed(), is(48L)); - // Truncate full block again newLength = length[0] - BLOCK_SIZE / 2; isReady = fs.truncate(src, newLength); @@ -400,11 +403,9 @@ public class TestFileTruncate { assertFileLength(snapshotFiles[2], length[2]); assertFileLength(snapshotFiles[1], length[1]); assertFileLength(snapshotFiles[0], length[0]); - // Diskspace consumed should be 16 bytes * 3. [blk 1,2 SS:3,4] contentSummary = fs.getContentSummary(parent); assertThat(contentSummary.getSpaceConsumed(), is(48L)); - // Truncate half of the last block newLength -= BLOCK_SIZE / 2; isReady = fs.truncate(src, newLength); @@ -415,15 +416,12 @@ public class TestFileTruncate { assertFileLength(snapshotFiles[0], length[0]); Block replacedBlk = getLocatedBlocks(src).getLastLocatedBlock() .getBlock().getLocalBlock(); - // Diskspace consumed should be 16 bytes * 3. [blk 1,6 SS:2,3,4] contentSummary = fs.getContentSummary(parent); assertThat(contentSummary.getSpaceConsumed(), is(54L)); - snapshotDir = fs.createSnapshot(parent, ss[3]); snapshotFiles[3] = new Path(snapshotDir, truncateFile); length[3] = newLength; - // Delete file. Should still be able to read snapshots int numINodes = fsDir.getInodeMapSize(); isReady = fs.delete(src, false); @@ -434,17 +432,15 @@ public class TestFileTruncate { assertFileLength(snapshotFiles[0], length[0]); assertEquals("Number of INodes should not change", numINodes, fsDir.getInodeMapSize()); - fs.deleteSnapshot(parent, ss[3]); - + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); assertBlockExists(firstBlk); assertBlockExists(lastBlk); assertBlockNotPresent(replacedBlk); - // Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4] contentSummary = fs.getContentSummary(parent); assertThat(contentSummary.getSpaceConsumed(), is(48L)); - // delete snapshots in the specified order fs.deleteSnapshot(parent, ss[deleteOrder[0]]); assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]); @@ -453,12 +449,12 @@ public class TestFileTruncate { assertBlockExists(lastBlk); assertEquals("Number of INodes should not change", numINodes, fsDir.getInodeMapSize()); - // Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4] contentSummary = fs.getContentSummary(parent); assertThat(contentSummary.getSpaceConsumed(), is(48L)); - fs.deleteSnapshot(parent, ss[deleteOrder[1]]); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]); assertBlockExists(firstBlk); contentSummary = fs.getContentSummary(parent); @@ -472,11 +468,11 @@ public class TestFileTruncate { } assertEquals("Number of INodes should not change", numINodes, fsDir .getInodeMapSize()); - fs.deleteSnapshot(parent, ss[deleteOrder[2]]); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); assertBlockNotPresent(firstBlk); assertBlockNotPresent(lastBlk); - // Diskspace consumed should be 0 bytes * 3. [] contentSummary = fs.getContentSummary(parent); assertThat(contentSummary.getSpaceConsumed(), is(0L)); @@ -490,7 +486,8 @@ public class TestFileTruncate { * remaining snapshots are still readable. */ @Test - public void testSnapshotWithTruncates() throws IOException { + public void testSnapshotWithTruncates() + throws IOException, InterruptedException { testSnapshotWithTruncates(0, 1, 2); testSnapshotWithTruncates(0, 2, 1); testSnapshotWithTruncates(1, 0, 2); @@ -499,7 +496,8 @@ public class TestFileTruncate { testSnapshotWithTruncates(2, 1, 0); } - void testSnapshotWithTruncates(int ... deleteOrder) throws IOException { + void testSnapshotWithTruncates(int... deleteOrder) + throws IOException, InterruptedException { fs.mkdirs(parent); fs.setQuota(parent, 100, 1000); fs.allowSnapshot(parent); @@ -546,6 +544,8 @@ public class TestFileTruncate { assertThat(contentSummary.getSpaceConsumed(), is(42L)); fs.deleteSnapshot(parent, ss[deleteOrder[0]]); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]); assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]); assertFileLength(src, length[2]); @@ -563,6 +563,8 @@ public class TestFileTruncate { } fs.deleteSnapshot(parent, ss[deleteOrder[1]]); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]); assertFileLength(src, length[2]); assertBlockExists(firstBlk); @@ -583,6 +585,8 @@ public class TestFileTruncate { } fs.deleteSnapshot(parent, ss[deleteOrder[2]]); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); assertFileLength(src, length[2]); assertBlockExists(firstBlk); @@ -592,6 +596,8 @@ public class TestFileTruncate { assertThat(contentSummary.getLength(), is(6L)); fs.delete(src, false); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem().getBlockManager()); assertBlockNotPresent(firstBlk); // Diskspace consumed should be 0 bytes * 3. [] @@ -1258,7 +1264,8 @@ public class TestFileTruncate { cluster.getNamesystem().getFSDirectory().getBlockManager() .getTotalBlocks()); fs.delete(p, true); - + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem().getBlockManager()); assertEquals("block num should 0", 0, cluster.getNamesystem().getFSDirectory().getBlockManager() .getTotalBlocks()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java index df36322e9f7..9736086950f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; import java.util.Random; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -140,6 +141,8 @@ public class TestLargeDirectoryDelete { final long start = Time.now(); mc.getFileSystem().delete(new Path("/root"), true); // recursive delete + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + mc.getNamesystem(0).getBlockManager()); final long end = Time.now(); threads[0].endThread(); threads[1].endThread(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java index c88570b56e0..4387f7679bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java @@ -139,6 +139,8 @@ public class TestMetaSave { nnRpc.delete("/filestatus0", true); nnRpc.delete("/filestatus1", true); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem().getBlockManager()); nnRpc.metaSave("metasaveAfterDelete.out.txt"); // Verification diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java index 81c9cb86700..d670025bf50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java @@ -1031,7 +1031,8 @@ public class TestNameNodeMXBean { @Test public void testTotalBlocksMetrics() throws Exception { MiniDFSCluster cluster = null; - FSNamesystem namesystem = null; + FSNamesystem activeNn = null; + FSNamesystem standbyNn = null; DistributedFileSystem fs = null; try { Configuration conf = new HdfsConfiguration(); @@ -1046,12 +1047,16 @@ public class TestNameNodeMXBean { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(totalSize).build(); - namesystem = cluster.getNamesystem(); - fs = cluster.getFileSystem(); + .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(1)). + numDataNodes(totalSize).build(); + cluster.waitActive(); + cluster.transitionToActive(0); + activeNn = cluster.getNamesystem(0); + standbyNn = cluster.getNamesystem(1); + fs = cluster.getFileSystem(0); fs.enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); - verifyTotalBlocksMetrics(0L, 0L, namesystem.getTotalBlocks()); + verifyTotalBlocksMetrics(0L, 0L, activeNn.getTotalBlocks()); // create small file Path replDirPath = new Path("/replicated"); @@ -1068,7 +1073,7 @@ public class TestNameNodeMXBean { final int smallLength = cellSize * dataBlocks; final byte[] smallBytes = StripedFileTestUtil.generateBytes(smallLength); DFSTestUtil.writeFile(fs, ecFileSmall, smallBytes); - verifyTotalBlocksMetrics(1L, 1L, namesystem.getTotalBlocks()); + verifyTotalBlocksMetrics(1L, 1L, activeNn.getTotalBlocks()); // create learge file Path replFileLarge = new Path(replDirPath, "replfile_large"); @@ -1079,15 +1084,20 @@ public class TestNameNodeMXBean { final int largeLength = blockSize * totalSize + smallLength; final byte[] largeBytes = StripedFileTestUtil.generateBytes(largeLength); DFSTestUtil.writeFile(fs, ecFileLarge, largeBytes); - verifyTotalBlocksMetrics(3L, 3L, namesystem.getTotalBlocks()); + verifyTotalBlocksMetrics(3L, 3L, activeNn.getTotalBlocks()); // delete replicated files fs.delete(replDirPath, true); - verifyTotalBlocksMetrics(0L, 3L, namesystem.getTotalBlocks()); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); + verifyTotalBlocksMetrics(0L, 3L, activeNn.getTotalBlocks()); // delete ec files fs.delete(ecDirPath, true); - verifyTotalBlocksMetrics(0L, 0L, namesystem.getTotalBlocks()); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); + verifyTotalBlocksMetrics(0L, 0L, activeNn.getTotalBlocks()); + verifyTotalBlocksMetrics(0L, 0L, standbyNn.getTotalBlocks()); } finally { if (fs != null) { try { @@ -1096,9 +1106,16 @@ public class TestNameNodeMXBean { throw e; } } - if (namesystem != null) { + if (activeNn != null) { try { - namesystem.close(); + activeNn.close(); + } catch (Exception e) { + throw e; + } + } + if (standbyNn != null) { + try { + standbyNn.close(); } catch (Exception e) { throw e; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java index 81d8f17566c..a59e07fd1d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java @@ -345,6 +345,8 @@ public class TestHASafeMode { // once it starts up banner("Removing the blocks without rolling the edit log"); fs.delete(new Path("/test"), true); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); BlockManagerTestUtil.computeAllPendingWork( nn0.getNamesystem().getBlockManager()); cluster.triggerHeartbeats(); @@ -384,6 +386,8 @@ public class TestHASafeMode { // ACKed when due to block removals. banner("Removing the blocks without rolling the edit log"); fs.delete(new Path("/test"), true); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem(0).getBlockManager()); BlockManagerTestUtil.computeAllPendingWork( nn0.getNamesystem().getBlockManager()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java index 349b7ac2411..aaedb8288e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java @@ -661,6 +661,8 @@ public class TestNameNodeMetrics { // verify ExcessBlocks metric is decremented and // excessReplicateMap is cleared after deleting a file fs.delete(file, true); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem().getBlockManager()); rb = getMetrics(NS_METRICS); assertGauge("ExcessBlocks", 0L, rb); assertEquals(0L, bm.getExcessBlocksCount()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java index 3e318b3a305..82bfa88f447 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INode; @@ -1128,7 +1129,8 @@ public class TestSnapshotDeletion { } @Test - public void testCorrectNumberOfBlocksAfterRestart() throws IOException { + public void testCorrectNumberOfBlocksAfterRestart() + throws IOException, InterruptedException { final Path foo = new Path("/foo"); final Path bar = new Path(foo, "bar"); final Path file = new Path(foo, "file"); @@ -1149,9 +1151,10 @@ public class TestSnapshotDeletion { hdfs.delete(bar, true); hdfs.delete(foo, true); - long numberOfBlocks = cluster.getNamesystem().getBlocksTotal(); cluster.restartNameNode(0); - assertEquals(numberOfBlocks, cluster.getNamesystem().getBlocksTotal()); + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty( + cluster.getNamesystem().getBlockManager()); + assertEquals(0, cluster.getNamesystem().getBlocksTotal()); } /*