diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index f8fd7890c2e..ca0942db273 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; @@ -1111,15 +1112,15 @@ public class BlockManager implements BlockStatsMXBean { DatanodeStorageInfo[] expectedStorages = blk.getUnderConstructionFeature().getExpectedStorageLocations(); if (expectedStorages.length - blk.numNodes() > 0) { - ArrayList pendingNodes = new ArrayList<>(); + ArrayList pendingNodes = new ArrayList<>(); for (DatanodeStorageInfo storage : expectedStorages) { DatanodeDescriptor dnd = storage.getDatanodeDescriptor(); if (blk.findStorageInfo(dnd) == null) { - pendingNodes.add(dnd); + pendingNodes.add(storage); } } pendingReconstruction.increment(blk, - pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()])); + pendingNodes.toArray(new DatanodeStorageInfo[pendingNodes.size()])); } } } @@ -2169,8 +2170,7 @@ public class BlockManager implements BlockStatsMXBean { // Move the block-replication into a "pending" state. // The reason we use 'pending' is so we can retry // reconstructions that fail after an appropriate amount of time. - pendingReconstruction.increment(block, - DatanodeStorageInfo.toDatanodeDescriptors(targets)); + pendingReconstruction.increment(block, targets); blockLog.debug("BLOCK* block {} is moved from neededReconstruction to " + "pendingReconstruction", block); @@ -4037,7 +4037,7 @@ public class BlockManager implements BlockStatsMXBean { BlockInfo storedBlock = getStoredBlock(block); if (storedBlock != null && block.getGenerationStamp() == storedBlock.getGenerationStamp()) { - if (pendingReconstruction.decrement(storedBlock, node)) { + if (pendingReconstruction.decrement(storedBlock, storageInfo)) { NameNode.getNameNodeMetrics().incSuccessfulReReplications(); } } @@ -4452,7 +4452,11 @@ public class BlockManager implements BlockStatsMXBean { addToInvalidates(block); removeBlockFromMap(block); // Remove the block from pendingReconstruction and neededReconstruction - pendingReconstruction.remove(block); + PendingBlockInfo remove = pendingReconstruction.remove(block); + if (remove != null) { + DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets() + .toArray(new DatanodeStorageInfo[remove.getTargets().size()])); + } neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL); postponedMisreplicatedBlocks.remove(block); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 59c17a87963..288bbb596db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1688,8 +1688,24 @@ public class DatanodeManager { List pendingList = nodeinfo.getReplicationCommand( numReplicationTasks); if (pendingList != null && !pendingList.isEmpty()) { - cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, - pendingList)); + // If the block is deleted, the block size will become + // BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't + // need + // to send for replication or reconstruction + Iterator iterator = pendingList.iterator(); + while (iterator.hasNext()) { + BlockTargetPair cmd = iterator.next(); + if (cmd.block != null + && cmd.block.getNumBytes() == BlockCommand.NO_ACK) { + // block deleted + DatanodeStorageInfo.decrementBlocksScheduled(cmd.targets); + iterator.remove(); + } + } + if (!pendingList.isEmpty()) { + cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, + pendingList)); + } } // check pending erasure coding tasks List pendingECList = nodeinfo diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java index fb93d829c5c..6e1af5729aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java @@ -81,7 +81,7 @@ class PendingReconstructionBlocks { * @param block The corresponding block * @param targets The DataNodes where replicas of the block should be placed */ - void increment(BlockInfo block, DatanodeDescriptor... targets) { + void increment(BlockInfo block, DatanodeStorageInfo... targets) { synchronized (pendingReconstructions) { PendingBlockInfo found = pendingReconstructions.get(block); if (found == null) { @@ -101,7 +101,7 @@ class PendingReconstructionBlocks { * @param dn The DataNode that finishes the reconstruction * @return true if the block is decremented to 0 and got removed. */ - boolean decrement(BlockInfo block, DatanodeDescriptor dn) { + boolean decrement(BlockInfo block, DatanodeStorageInfo dn) { boolean removed = false; synchronized (pendingReconstructions) { PendingBlockInfo found = pendingReconstructions.get(block); @@ -124,9 +124,9 @@ class PendingReconstructionBlocks { * The given block whose pending reconstruction requests need to be * removed */ - void remove(BlockInfo block) { + PendingBlockInfo remove(BlockInfo block) { synchronized (pendingReconstructions) { - pendingReconstructions.remove(block); + return pendingReconstructions.remove(block); } } @@ -200,11 +200,11 @@ class PendingReconstructionBlocks { */ static class PendingBlockInfo { private long timeStamp; - private final List targets; + private final List targets; - PendingBlockInfo(DatanodeDescriptor[] targets) { + PendingBlockInfo(DatanodeStorageInfo[] targets) { this.timeStamp = monotonicNow(); - this.targets = targets == null ? new ArrayList() + this.targets = targets == null ? new ArrayList() : new ArrayList<>(Arrays.asList(targets)); } @@ -216,9 +216,9 @@ class PendingReconstructionBlocks { timeStamp = monotonicNow(); } - void incrementReplicas(DatanodeDescriptor... newTargets) { + void incrementReplicas(DatanodeStorageInfo... newTargets) { if (newTargets != null) { - for (DatanodeDescriptor newTarget : newTargets) { + for (DatanodeStorageInfo newTarget : newTargets) { if (!targets.contains(newTarget)) { targets.add(newTarget); } @@ -226,13 +226,23 @@ class PendingReconstructionBlocks { } } - void decrementReplicas(DatanodeDescriptor dn) { - targets.remove(dn); + void decrementReplicas(DatanodeStorageInfo dn) { + Iterator iterator = targets.iterator(); + while (iterator.hasNext()) { + DatanodeStorageInfo next = iterator.next(); + if (next.getDatanodeDescriptor() == dn.getDatanodeDescriptor()) { + iterator.remove(); + } + } } int getNumReplicas() { return targets.size(); } + + List getTargets() { + return targets; + } } /* @@ -318,4 +328,14 @@ class PendingReconstructionBlocks { } } } + + List getTargets(BlockInfo block) { + synchronized (pendingReconstructions) { + PendingBlockInfo found = pendingReconstructions.get(block); + if (found != null) { + return found.targets; + } + } + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java index 18942780e31..95f4e2c5973 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java @@ -22,11 +22,19 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.ArrayList; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.junit.After; import org.junit.Test; @@ -129,4 +137,69 @@ public class TestBlocksScheduledCounter { 0, descriptor.getBlocksScheduled()); } } -} + + /** + * Test if Block Scheduled counter decrement if scheduled blocks file is. + * deleted + * @throws Exception + */ + @Test + public void testScheduledBlocksCounterDecrementOnDeletedBlock() + throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build(); + cluster.waitActive(); + BlockManager bm = cluster.getNamesystem().getBlockManager(); + try { + DistributedFileSystem dfs = cluster.getFileSystem(); + // 1. create a file + Path filePath = new Path("/tmp.txt"); + DFSTestUtil.createFile(dfs, filePath, 1024, (short) 3, 0L); + + // 2. disable the heartbeats + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + + DatanodeManager datanodeManager = + cluster.getNamesystem().getBlockManager().getDatanodeManager(); + ArrayList dnList = + new ArrayList(); + datanodeManager.fetchDatanodes(dnList, dnList, false); + + // 3. mark a couple of blocks as corrupt + LocatedBlock block = NameNodeAdapter + .getBlockLocations(cluster.getNameNode(), filePath.toString(), 0, 1) + .get(0); + DatanodeInfo[] locs = block.getLocations(); + cluster.getNamesystem().writeLock(); + try { + bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[0], "STORAGE_ID", + "TEST"); + bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[1], "STORAGE_ID", + "TEST"); + BlockManagerTestUtil.computeAllPendingWork(bm); + BlockManagerTestUtil.updateState(bm); + assertEquals(1L, bm.getPendingReconstructionBlocksCount()); + } finally { + cluster.getNamesystem().writeUnlock(); + } + + // 4. delete the file + dfs.delete(filePath, true); + int blocksScheduled = 0; + for (DatanodeDescriptor descriptor : dnList) { + if (descriptor.getBlocksScheduled() != 0) { + blocksScheduled += descriptor.getBlocksScheduled(); + } + } + assertEquals(0, blocksScheduled); + } finally { + cluster.shutdown(); + } + } + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java index dc37ec062ef..340f467ecb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java @@ -85,8 +85,7 @@ public class TestPendingReconstruction { BlockInfo block = genBlockInfo(i, i, 0); DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i]; System.arraycopy(storages, 0, targets, 0, i); - pendingReconstructions.increment(block, - DatanodeStorageInfo.toDatanodeDescriptors(targets)); + pendingReconstructions.increment(block, targets); } assertEquals("Size of pendingReconstruction ", 10, pendingReconstructions.size()); @@ -96,25 +95,24 @@ public class TestPendingReconstruction { // remove one item // BlockInfo blk = genBlockInfo(8, 8, 0); - pendingReconstructions.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica + pendingReconstructions.decrement(blk, storages[7]); // removes one replica assertEquals("pendingReconstructions.getNumReplicas ", 7, pendingReconstructions.getNumReplicas(blk)); // // insert the same item twice should be counted as once // - pendingReconstructions.increment(blk, storages[0].getDatanodeDescriptor()); + pendingReconstructions.increment(blk, storages[0]); assertEquals("pendingReconstructions.getNumReplicas ", 7, pendingReconstructions.getNumReplicas(blk)); for (int i = 0; i < 7; i++) { // removes all replicas - pendingReconstructions.decrement(blk, storages[i].getDatanodeDescriptor()); + pendingReconstructions.decrement(blk, storages[i]); } assertTrue(pendingReconstructions.size() == 9); pendingReconstructions.increment(blk, - DatanodeStorageInfo.toDatanodeDescriptors( - DFSTestUtil.createDatanodeStorageInfos(8))); + DFSTestUtil.createDatanodeStorageInfos(8)); assertTrue(pendingReconstructions.size() == 10); // @@ -144,8 +142,7 @@ public class TestPendingReconstruction { for (int i = 10; i < 15; i++) { BlockInfo block = genBlockInfo(i, i, 0); pendingReconstructions.increment(block, - DatanodeStorageInfo.toDatanodeDescriptors( - DFSTestUtil.createDatanodeStorageInfos(i))); + DFSTestUtil.createDatanodeStorageInfos(i)); } assertEquals(15, pendingReconstructions.size()); assertEquals(0L, pendingReconstructions.getNumTimedOuts()); @@ -213,8 +210,7 @@ public class TestPendingReconstruction { blockInfo = new BlockInfoContiguous(block, (short) 3); pendingReconstruction.increment(blockInfo, - DatanodeStorageInfo.toDatanodeDescriptors( - DFSTestUtil.createDatanodeStorageInfos(1))); + DFSTestUtil.createDatanodeStorageInfos(1)); BlockCollection bc = Mockito.mock(BlockCollection.class); // Place into blocksmap with GenerationStamp = 1 blockInfo.setGenerationStamp(1); @@ -230,8 +226,7 @@ public class TestPendingReconstruction { block = new Block(2, 2, 0); blockInfo = new BlockInfoContiguous(block, (short) 3); pendingReconstruction.increment(blockInfo, - DatanodeStorageInfo.toDatanodeDescriptors( - DFSTestUtil.createDatanodeStorageInfos(1))); + DFSTestUtil.createDatanodeStorageInfos(1)); // verify 2 blocks in pendingReconstructions assertEquals("Size of pendingReconstructions ", 2, @@ -277,7 +272,8 @@ public class TestPendingReconstruction { getDatanodes().iterator().next() }; // Add a stored block to the pendingReconstruction. - pendingReconstruction.increment(storedBlock, desc); + pendingReconstruction.increment(blockInfo, + DFSTestUtil.createDatanodeStorageInfos(1)); assertEquals("Size of pendingReconstructions ", 1, pendingReconstruction.size()); @@ -306,6 +302,8 @@ public class TestPendingReconstruction { fsn.writeUnlock(); } + GenericTestUtils.waitFor(() -> pendingReconstruction.size() == 0, 500, + 10000); // The pending queue should be empty. assertEquals("Size of pendingReconstructions ", 0, pendingReconstruction.size());