diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 790cd77de97..6256dd03958 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; @@ -154,7 +155,8 @@ public class BlockInfoStriped extends BlockInfo { return -1; } - byte getStorageBlockIndex(DatanodeStorageInfo storage) { + @VisibleForTesting + public byte getStorageBlockIndex(DatanodeStorageInfo storage) { int i = this.findStorageInfo(storage); return i == -1 ? -1 : indices[i]; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java index f30066ae972..ce92151cd04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java @@ -437,6 +437,11 @@ public class DatanodeAdminManager { return monitor.numNodesChecked; } + @VisibleForTesting + public Queue getPendingNodes() { + return pendingNodes; + } + /** * Checks to see if datanodes have finished DECOMMISSION_INPROGRESS or * ENTERING_MAINTENANCE state. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index a4e43f2e1ed..4edf76bba82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -643,7 +643,9 @@ public class DatanodeDescriptor extends DatanodeInfo { /** * Store block replication work. */ - void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) { + @VisibleForTesting + public void addBlockToBeReplicated(Block block, + DatanodeStorageInfo[] targets) { assert(block != null && targets != null && targets.length > 0); replicateBlocks.offer(new BlockTargetPair(block, targets)); } @@ -701,7 +703,8 @@ public class DatanodeDescriptor extends DatanodeInfo { return erasurecodeBlocks.size(); } - int getNumberOfReplicateBlocks() { + @VisibleForTesting + public int getNumberOfReplicateBlocks() { return replicateBlocks.size(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index 147f8cfe6e7..dcf1152918e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -164,11 +164,23 @@ class ErasureCodingWork extends BlockReconstructionWork { } private List findLeavingServiceSources() { + // Mark the block in normal node. + BlockInfoStriped block = (BlockInfoStriped)getBlock(); + BitSet bitSet = new BitSet(block.getRealTotalBlockNum()); + for (int i = 0; i < getSrcNodes().length; i++) { + if (getSrcNodes()[i].isInService()) { + bitSet.set(liveBlockIndicies[i]); + } + } + // If the block is on the node which is decommissioning or + // entering_maintenance, and it doesn't exist on other normal nodes, + // we just add the node into source list. List srcIndices = new ArrayList<>(); for (int i = 0; i < getSrcNodes().length; i++) { - if (getSrcNodes()[i].isDecommissionInProgress() || + if ((getSrcNodes()[i].isDecommissionInProgress() || (getSrcNodes()[i].isEnteringMaintenance() && - getSrcNodes()[i].isAlive())) { + getSrcNodes()[i].isAlive())) && + !bitSet.get(liveBlockIndicies[i])) { srcIndices.add(i); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java index 6303775ac85..ded7ffbc4d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java @@ -396,7 +396,8 @@ public class ProvidedStorageMap { } @Override - void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) { + public void addBlockToBeReplicated(Block block, + DatanodeStorageInfo[] targets) { // pick a random datanode, delegate to it DatanodeDescriptor node = chooseRandom(targets); if (node != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java index 7bd85b4989c..64dd17c9ebe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java @@ -27,9 +27,11 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.BitSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; @@ -43,14 +45,19 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; import org.junit.After; import org.junit.Assert; @@ -585,4 +592,121 @@ public class TestDecommissionWithStriped { } return null; } + + /** + * Simulate that There are 2 nodes(dn0,dn1) in decommission. Firstly dn0 + * replicates in success, dn1 replicates in failure. Decommissions go on. + */ + @Test (timeout = 120000) + public void testDecommissionWithFailedReplicating() throws Exception { + + // Write ec file. + Path ecFile = new Path(ecDir, "firstReplicationFailedFile"); + int writeBytes = cellSize * 6; + writeStripedFile(dfs, ecFile, writeBytes); + + // Get 2 nodes of ec block and set them in decommission. + // The 2 nodes are not in pendingNodes of DatanodeAdminManager. + List lbs = ((HdfsDataInputStream) dfs.open(ecFile)) + .getAllBlocks(); + LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0); + DatanodeInfo[] dnList = blk.getLocations(); + DatanodeDescriptor dn0 = bm.getDatanodeManager() + .getDatanode(dnList[0].getDatanodeUuid()); + dn0.startDecommission(); + DatanodeDescriptor dn1 = bm.getDatanodeManager() + .getDatanode(dnList[1].getDatanodeUuid()); + dn1.startDecommission(); + + assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager() + .getNumPendingNodes()); + + // Replicate dn0 block to another dn + // Simulate that dn0 replicates in success, dn1 replicates in failure. + final byte blockIndex = blk.getBlockIndices()[0]; + final Block targetBlk = new Block(blk.getBlock().getBlockId() + blockIndex, + cellSize, blk.getBlock().getGenerationStamp()); + DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk); + DatanodeDescriptor target = bm.getDatanodeManager() + .getDatanode(extraDn.getDatanodeUuid()); + dn0.addBlockToBeReplicated(targetBlk, + new DatanodeStorageInfo[] {target.getStorageInfos()[0]}); + + // dn0 replicates in success + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return dn0.getNumberOfReplicateBlocks() == 0; + } + }, 100, 60000); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + Iterator it = + bm.getStoredBlock(targetBlk).getStorageInfos(); + while(it.hasNext()) { + if (it.next().getDatanodeDescriptor().equals(target)) { + return true; + } + } + return false; + } + }, 100, 60000); + + // There are 8 live replicas + BlockInfoStriped blockInfo = + (BlockInfoStriped)bm.getStoredBlock( + new Block(blk.getBlock().getBlockId())); + assertEquals(8, bm.countNodes(blockInfo).liveReplicas()); + + // Add the 2 nodes to pendingNodes of DatanodeAdminManager + bm.getDatanodeManager().getDatanodeAdminManager() + .getPendingNodes().add(dn0); + bm.getDatanodeManager().getDatanodeAdminManager() + .getPendingNodes().add(dn1); + + waitNodeState(dn0, AdminStates.DECOMMISSIONED); + waitNodeState(dn1, AdminStates.DECOMMISSIONED); + + // There are 9 live replicas + assertEquals(9, bm.countNodes(blockInfo).liveReplicas()); + + // After dn0 & dn1 decommissioned, all internal Blocks(0~8) are there + Iterator it = blockInfo.getStorageInfos(); + BitSet indexBitSet = new BitSet(9); + while(it.hasNext()) { + DatanodeStorageInfo storageInfo = it.next(); + if(storageInfo.getDatanodeDescriptor().equals(dn0) + || storageInfo.getDatanodeDescriptor().equals(dn1)) { + // Skip decommissioned nodes + continue; + } + byte index = blockInfo.getStorageBlockIndex(storageInfo); + indexBitSet.set(index); + } + for (int i = 0; i < 9; ++i) { + assertEquals(true, indexBitSet.get(i)); + } + } + + /** + * Get a Datanode which does not contain the block. + */ + private DatanodeInfo getDatanodeOutOfTheBlock(LocatedStripedBlock blk) + throws Exception { + DatanodeInfo[] allDnInfos = client.datanodeReport(DatanodeReportType.LIVE); + DatanodeInfo[] blkDnInos= blk.getLocations(); + for (DatanodeInfo dnInfo : allDnInfos) { + boolean in = false; + for (DatanodeInfo blkDnInfo : blkDnInos) { + if (blkDnInfo.equals(dnInfo)) { + in = true; + } + } + if(!in) { + return dnInfo; + } + } + return null; + } }