From 5941a91f64882e9717a37c431f2632cd558b6558 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 6 Nov 2019 22:10:27 +0530 Subject: [PATCH] HDFS-14946. Erasure Coding: Block recovery failed during decommissioning. Contributed by Fei Hui. --- .../server/blockmanagement/BlockManager.java | 39 ++++++-- .../hdfs/TestDecommissionWithStriped.java | 88 +++++++++++++++++-- 2 files changed, 116 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a291707ffe0..41575cc885e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2079,13 +2079,14 @@ public class BlockManager implements BlockStatsMXBean { numReplicas.decommissioning() - numReplicas.liveEnteringMaintenanceReplicas(); } - byte[] indices = new byte[liveBlockIndices.size()]; - for (int i = 0 ; i < liveBlockIndices.size(); i++) { - indices[i] = liveBlockIndices.get(i); - } - return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes, + final DatanodeDescriptor[] newSrcNodes = + new DatanodeDescriptor[srcNodes.length]; + byte[] newIndices = new byte[liveBlockIndices.size()]; + adjustSrcNodesAndIndices((BlockInfoStriped)block, + srcNodes, liveBlockIndices, newSrcNodes, newIndices); + return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, - priority, indices); + priority, newIndices); } else { return new ReplicationWork(block, bc, srcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, @@ -2093,6 +2094,32 @@ public class BlockManager implements BlockStatsMXBean { } } + /** + * Adjust srcNodes and indices which are used to reconstruction block. + * We should guarantee the indexes of first minRequiredSources nodes + + are different. + */ + private void adjustSrcNodesAndIndices(BlockInfoStriped block, + DatanodeDescriptor[] srcNodes, List indices, + DatanodeDescriptor[] newSrcNodes, byte[] newIndices) { + BitSet bitSet = new BitSet(block.getRealTotalBlockNum()); + List skipIndexList = new ArrayList<>(); + for (int i = 0, j = 0; i < srcNodes.length; i++) { + if (!bitSet.get(indices.get(i))) { + bitSet.set(indices.get(i)); + newSrcNodes[j] = srcNodes[i]; + newIndices[j++] = indices.get(i); + } else { + skipIndexList.add(i); + } + } + for(int i = srcNodes.length - skipIndexList.size(), j = 0; + i < srcNodes.length; i++, j++) { + newSrcNodes[i] = srcNodes[skipIndexList.get(j)]; + newIndices[i] = indices.get(skipIndexList.get(j)); + } + } + private boolean validateReconstructionWork(BlockReconstructionWork rw) { BlockInfo block = rw.getBlock(); int priority = rw.getPriority(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java index b375321e5fa..5cbb84a6ab5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java @@ -436,14 +436,15 @@ public class TestDecommissionWithStriped { return new DFSClient(nn.getNameNodeAddress(), conf); } - private void writeStripedFile(DistributedFileSystem dfs, Path ecFile, - int writeBytes) throws IOException, Exception { + private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile, + int writeBytes) throws Exception { byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes); - DFSTestUtil.writeFile(dfs, ecFile, new String(bytes)); - StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString()); + DFSTestUtil.writeFile(fs, ecFile, new String(bytes)); + StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString()); - StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, + StripedFileTestUtil.checkData(fs, ecFile, writeBytes, new ArrayList(), null, blockGroupSize); + return bytes; } private void writeConfigFile(Path name, List nodes) @@ -894,4 +895,81 @@ public class TestDecommissionWithStriped { assertEquals(9, bm.countNodes(blockInfo).liveReplicas()); cleanupFile(dfs, ecFile); } + + /** + * Test recovery for an ec block, its storage array contains these internal + * blocks which are {b0, b1, b2, b3, null, b5, b6, b7, b8, b0, b1, b2, + * b3}, array[0]{b0} in decommissioning, array[1-3]{b1, b2, b3} are + * in decommissioned. array[4] is null, array[5-12]{b[5-8],b[0-3]} are + * in live. + */ + @Test (timeout = 120000) + public void testRecoveryWithDecommission() throws Exception { + final Path ecFile = new Path(ecDir, "testRecoveryWithDecommission"); + int writeBytes = cellSize * dataBlocks; + byte[] originBytesArray = writeStripedFile(dfs, ecFile, writeBytes); + List lbs = ((HdfsDataInputStream) dfs.open(ecFile)) + .getAllBlocks(); + LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0); + DatanodeInfo[] dnList = blk.getLocations(); + BlockInfoStriped blockInfo = + (BlockInfoStriped)bm.getStoredBlock( + new Block(blk.getBlock().getBlockId())); + + // Decommission datanode dn0 contains block b0 + // Aim to add storageinfo of replicated block b0 to storages[9] of ec block + List decommissionedNodes = new ArrayList<>(); + decommissionedNodes.add(dnList[0]); + decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED); + + // Now storages of ec block are (b0{decommissioned}, b[1-8]{live}, + // b0{live}) + assertEquals(9, bm.countNodes(blockInfo).liveReplicas()); + assertEquals(1, bm.countNodes(blockInfo).decommissioned()); + + int decommissionNodesNum = 4; + + // Decommission nodes contain blocks of b[0-3] + // dn0 has been decommissioned + for (int i = 1; i < decommissionNodesNum; i++) { + decommissionedNodes.add(dnList[i]); + } + decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED); + + // Now storages of ec block are (b[0-3]{decommissioned}, b[4-8]{live}, + // b0{live}, b[1-3]{live}) + // There are 9 live and 4 decommissioned internal blocks + assertEquals(9, bm.countNodes(blockInfo).liveReplicas()); + assertEquals(4, bm.countNodes(blockInfo).decommissioned()); + + // There are no reconstruction tasks + assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager() + .getNumPendingNodes()); + assertEquals(0, bm.getUnderReplicatedNotMissingBlocks()); + + // Set dn0 in decommissioning + // So that the block on dn0 can be used for reconstruction task + DatanodeDescriptor dn0 = bm.getDatanodeManager() + .getDatanode(dnList[0].getDatanodeUuid()); + dn0.startDecommission(); + + // Stop the datanode contains b4 + DataNode dn = cluster.getDataNode( + dnList[decommissionNodesNum].getIpcPort()); + cluster.stopDataNode(dnList[decommissionNodesNum].getXferAddr()); + cluster.setDataNodeDead(dn.getDatanodeId()); + + // Now storages of ec block are (b[0]{decommissioning}, + // b[1-3]{decommissioned}, null, b[5-8]{live}, b0{live}, b[1-3]{live}) + // There are 8 live and 1 decommissioning internal blocks + // Wait for reconstruction EC block. + GenericTestUtils.waitFor( + () -> bm.countNodes(blockInfo).liveReplicas() == 9, + 100, 10000); + + byte[] readBytesArray = new byte[writeBytes]; + StripedFileTestUtil.verifyPread(dfs, ecFile, writeBytes, + originBytesArray, readBytesArray, ecPolicy); + cleanupFile(dfs, ecFile); + } }