diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index a786c6a2204..1d0975111d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1661,6 +1661,7 @@ public class DatanodeManager { if (pendingList != null) { cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); + maxTransfers -= pendingList.size(); } // check pending erasure coding tasks List pendingECList = nodeinfo diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index 30e2aaf7fd0..de002f48573 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -44,13 +44,21 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.util.Shell; import org.junit.Assert; @@ -491,4 +499,47 @@ public class TestDatanodeManager { Assert.assertEquals("Unexpected host or host in unexpected position", "127.0.0.1:23456", bothAgain.get(1).getInfoAddr()); } + + @Test + public void testPendingRecoveryTasks() throws IOException { + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + Configuration conf = new Configuration(); + DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf)); + + int maxTransfers = 20; + int numPendingTasks = 7; + int numECTasks = maxTransfers - numPendingTasks; + + DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class); + Mockito.when(nodeInfo.isRegistered()).thenReturn(true); + Mockito.when(nodeInfo.getStorageInfos()) + .thenReturn(new DatanodeStorageInfo[0]); + + List pendingList = + Collections.nCopies(numPendingTasks, new BlockTargetPair(null, null)); + Mockito.when(nodeInfo.getReplicationCommand(maxTransfers)) + .thenReturn(pendingList); + List ecPendingList = + Collections.nCopies(numECTasks, null); + + Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks)) + .thenReturn(ecPendingList); + DatanodeRegistration dnReg = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo); + + DatanodeCommand[] cmds = dm.handleHeartbeat( + dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null, + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + + assertEquals(2, cmds.length); + assertTrue(cmds[0] instanceof BlockCommand); + BlockCommand replicaCmd = (BlockCommand) cmds[0]; + assertEquals(numPendingTasks, replicaCmd.getBlocks().length); + assertEquals(numPendingTasks, replicaCmd.getTargets().length); + assertTrue(cmds[1] instanceof BlockECReconstructionCommand); + BlockECReconstructionCommand ecRecoveryCmd = + (BlockECReconstructionCommand) cmds[1]; + assertEquals(numECTasks, ecRecoveryCmd.getECTasks().size()); + } }