diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 4c757338265..3b76eec0d4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -854,7 +854,7 @@ public class BlockManager implements BlockStatsMXBean { } // source node returned is not used chooseSourceDatanodes(blockInfo, containingNodes, - containingLiveReplicasNodes, numReplicas, + containingLiveReplicasNodes, numReplicas, new ArrayList(), new ArrayList(), LowRedundancyBlocks.LEVEL); // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are @@ -2024,9 +2024,10 @@ public class BlockManager implements BlockStatsMXBean { List liveReplicaNodes = new ArrayList<>(); NumberReplicas numReplicas = new NumberReplicas(); List liveBlockIndices = new ArrayList<>(); + List liveBusyBlockIndices = new ArrayList<>(); final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, containingNodes, liveReplicaNodes, numReplicas, - liveBlockIndices, priority); + liveBlockIndices, liveBusyBlockIndices, priority); short requiredRedundancy = getExpectedLiveRedundancyNum(block, numReplicas); if(srcNodes == null || srcNodes.length == 0) { @@ -2079,9 +2080,13 @@ public class BlockManager implements BlockStatsMXBean { for (int i = 0 ; i < liveBlockIndices.size(); i++) { indices[i] = liveBlockIndices.get(i); } + byte[] busyIndices = new byte[liveBusyBlockIndices.size()]; + for (int i = 0; i < liveBusyBlockIndices.size(); i++) { + busyIndices[i] = liveBusyBlockIndices.get(i); + } return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, - priority, indices); + priority, indices, busyIndices); } else { return new ReplicationWork(block, bc, srcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, @@ -2293,8 +2298,8 @@ public class BlockManager implements BlockStatsMXBean { DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, List containingNodes, List nodesContainingLiveReplicas, - NumberReplicas numReplicas, - List liveBlockIndices, int priority) { + NumberReplicas numReplicas, List liveBlockIndices, + List liveBusyBlockIndices, int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); List srcNodes = new ArrayList<>(); @@ -2347,12 +2352,6 @@ public class BlockManager implements BlockStatsMXBean { continue; } - if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY - && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance()) - && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { - continue; // already reached replication limit - } - // for EC here need to make sure the numReplicas replicates state correct // because in the scheduleReconstruction it need the numReplicas to check // whether need to reconstruct the ec internal block @@ -2364,7 +2363,19 @@ public class BlockManager implements BlockStatsMXBean { liveBitSet, decommissioningBitSet, blockIndex); } + if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY + && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance()) + && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { + if (isStriped && state == StoredReplicaState.LIVE) { + liveBusyBlockIndices.add(blockIndex); + } + continue; // already reached replication limit + } + if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) { + if (isStriped && state == StoredReplicaState.LIVE) { + liveBusyBlockIndices.add(blockIndex); + } continue; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 9fa1de8e27f..9035fd36f41 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -634,7 +634,8 @@ public class DatanodeDescriptor extends DatanodeInfo { return new BlockIterator(startBlock, getStorageInfos()); } - void incrementPendingReplicationWithoutTargets() { + @VisibleForTesting + public void incrementPendingReplicationWithoutTargets() { pendingReplicationWithoutTargets++; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index 5d0629350a7..8de3f381ddf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -31,6 +31,7 @@ import java.util.Set; class ErasureCodingWork extends BlockReconstructionWork { private final byte[] liveBlockIndicies; + private final byte[] liveBusyBlockIndicies; private final String blockPoolId; public ErasureCodingWork(String blockPoolId, BlockInfo block, @@ -38,12 +39,13 @@ class ErasureCodingWork extends BlockReconstructionWork { DatanodeDescriptor[] srcNodes, List containingNodes, List liveReplicaStorages, - int additionalReplRequired, - int priority, byte[] liveBlockIndicies) { + int additionalReplRequired, int priority, + byte[] liveBlockIndicies, byte[] liveBusyBlockIndicies) { super(block, bc, srcNodes, containingNodes, liveReplicaStorages, additionalReplRequired, priority); this.blockPoolId = blockPoolId; this.liveBlockIndicies = liveBlockIndicies; + this.liveBusyBlockIndicies = liveBusyBlockIndicies; LOG.debug("Creating an ErasureCodingWork to {} reconstruct ", block); } @@ -70,13 +72,17 @@ class ErasureCodingWork extends BlockReconstructionWork { */ private boolean hasAllInternalBlocks() { final BlockInfoStriped block = (BlockInfoStriped) getBlock(); - if (getSrcNodes().length < block.getRealTotalBlockNum()) { + if (liveBlockIndicies.length + + liveBusyBlockIndicies.length < block.getRealTotalBlockNum()) { return false; } BitSet bitSet = new BitSet(block.getTotalBlockNum()); for (byte index : liveBlockIndicies) { bitSet.set(index); } + for (byte busyIndex: liveBusyBlockIndicies) { + bitSet.set(busyIndex); + } for (int i = 0; i < block.getRealDataBlockNum(); i++) { if (!bitSet.get(i)) { return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java index 0031878fa19..f4a99e9062e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java @@ -40,14 +40,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; @@ -55,6 +57,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.security.token.Token; @@ -81,6 +84,9 @@ public class TestDecommissionWithStriped { // replication interval private static final int NAMENODE_REPLICATION_INTERVAL = 1; + private int replicationStreamsHardLimit = + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT; + private Path decommissionDir; private Path hostsFile; private Path excludeFile; @@ -273,7 +279,6 @@ public class TestDecommissionWithStriped { fsn.getNumDecomLiveDataNodes()); // Ensure decommissioned datanode is not automatically shutdown - DFSClient client = getDfsClient(cluster.getNameNode(0), conf); assertEquals("All datanodes must be alive", numDNs, client.datanodeReport(DatanodeReportType.LIVE).length); @@ -283,6 +288,65 @@ public class TestDecommissionWithStriped { cleanupFile(dfs, ecFile); } + /** + * DN decommission shouldn't reconstruction busy DN block. + * @throws Exception + */ + @Test(timeout = 120000) + public void testDecommissionWithBusyNode() throws Exception { + byte busyDNIndex = 1; + byte decommisionDNIndex = 0; + //1. create EC file + final Path ecFile = new Path(ecDir, "testDecommissionWithBusyNode"); + int writeBytes = cellSize * dataBlocks; + writeStripedFile(dfs, ecFile, writeBytes); + Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks()); + FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes); + + //2. make once DN busy + final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() + .getINode4Write(ecFile.toString()).asFile(); + BlockInfo firstBlock = fileNode.getBlocks()[0]; + DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock); + DatanodeDescriptor busyNode = + dnStorageInfos[busyDNIndex].getDatanodeDescriptor(); + for (int j = 0; j < replicationStreamsHardLimit; j++) { + busyNode.incrementPendingReplicationWithoutTargets(); + } + + //3. decomission one node + List decommisionNodes = new ArrayList<>(); + decommisionNodes.add( + dnStorageInfos[decommisionDNIndex].getDatanodeDescriptor()); + decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED); + assertEquals(decommisionNodes.size(), fsn.getNumDecomLiveDataNodes()); + + //4. wait for decommission block to replicate + Thread.sleep(3000); + DatanodeStorageInfo[] newDnStorageInfos = bm.getStorages(firstBlock); + Assert.assertEquals("Busy DN shouldn't be reconstructed", + dnStorageInfos[busyDNIndex].getStorageID(), + newDnStorageInfos[busyDNIndex].getStorageID()); + + //5. check decommission DN block index, it should be reconstructed again + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + ecFile.toString(), 0, writeBytes); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + int decommissionBlockIndexCount = 0; + for (byte index : bg.getBlockIndices()) { + if (index == decommisionDNIndex) { + decommissionBlockIndexCount++; + } + } + + Assert.assertEquals("Decommission DN block should be reconstructed", 2, + decommissionBlockIndexCount); + + FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes); + Assert.assertTrue("Checksum mismatches!", + fileChecksum1.equals(fileChecksum2)); + } + /** * Tests to verify that the file checksum should be able to compute after the * decommission operation. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index ba88afe2591..237cacc7c97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -660,6 +660,7 @@ public class TestBlockManager { liveNodes, new NumberReplicas(), new ArrayList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY)[0]); assertEquals("Does not choose a source node for a less-than-highest-priority" @@ -671,6 +672,7 @@ public class TestBlockManager { liveNodes, new NumberReplicas(), new ArrayList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY).length); // Increase the replication count to test replication count > hard limit @@ -685,6 +687,7 @@ public class TestBlockManager { liveNodes, new NumberReplicas(), new ArrayList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length); } @@ -730,13 +733,15 @@ public class TestBlockManager { List liveNodes = new LinkedList(); NumberReplicas numReplicas = new NumberReplicas(); List liveBlockIndices = new ArrayList<>(); + List liveBusyBlockIndices = new ArrayList<>(); bm.chooseSourceDatanodes( aBlockInfoStriped, cntNodes, liveNodes, numReplicas, liveBlockIndices, - LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY); + liveBusyBlockIndices, + LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY); assertEquals("Choose the source node for reconstruction with one node reach" + " the MAX maxReplicationStreams, the numReplicas still return the" @@ -791,12 +796,14 @@ public class TestBlockManager { new LinkedList(); NumberReplicas numReplicas = new NumberReplicas(); List liveBlockIndices = new ArrayList<>(); + List liveBusyBlockIndices = new ArrayList<>(); bm.chooseSourceDatanodes( aBlockInfoStriped, containingNodes, nodesContainingLiveReplicas, numReplicas, liveBlockIndices, + liveBusyBlockIndices, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY); assertEquals("There are 5 live replicas in " + "[ds2, ds3, ds4, ds5, ds6] datanodes ", @@ -828,7 +835,9 @@ public class TestBlockManager { bm.getStoredBlock(aBlock), cntNodes, liveNodes, - new NumberReplicas(), new LinkedList(), + new NumberReplicas(), + new LinkedList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY)[0]); @@ -842,7 +851,9 @@ public class TestBlockManager { bm.getStoredBlock(aBlock), cntNodes, liveNodes, - new NumberReplicas(), new LinkedList(), + new NumberReplicas(), + new LinkedList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY).length); }