diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 93c3162f352..8403d1adc90 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -353,3 +353,6 @@ HDFS-8734. Erasure Coding: fix one cell need two packets. (Walter Su via jing9) + + HDFS-8619. Erasure Coding: revisit replica counting for striped blocks. + (Jing Zhao via yliu) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index 61068b96dc7..82aa3489cd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -178,6 +178,9 @@ public abstract class BlockInfo extends Block public abstract boolean isStriped(); + /** @return true if there is no datanode storage associated with the block */ + abstract boolean hasNoStorage(); + /** * Find specified DatanodeDescriptor. * @return index or -1 if not found. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index 5199101267d..dfca8ea45af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -150,4 +150,9 @@ public class BlockInfoContiguous extends BlockInfo { public final boolean isStriped() { return false; } + + @Override + final boolean hasNoStorage() { + return getStorageInfo(0) == null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 07e29f8a713..66745101c08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -272,4 +272,15 @@ public class BlockInfoStriped extends BlockInfo { } return ucBlock; } + + @Override + final boolean hasNoStorage() { + final int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + if (getStorageInfo(idx) != null) { + return false; + } + } + return true; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 1aaf22569c4..b641fa2ac58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1235,10 +1235,11 @@ public class BlockManager { } /** - * - * @param b + * Mark a replica (of a contiguous block) or an internal block (of a striped + * block group) as corrupt. + * @param b Indicating the reported bad block and the corresponding BlockInfo + * stored in blocksMap. * @param storageInfo storage that contains the block, if known. null otherwise. - * @throws IOException */ private void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeStorageInfo storageInfo, @@ -1258,8 +1259,13 @@ public class BlockManager { storageInfo.addBlock(b.stored, b.corrupted); } - // Add this replica to corruptReplicas Map - corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, + // Add this replica to corruptReplicas Map. For striped blocks, we always + // use the id of whole striped block group when adding to corruptReplicas + Block corrupted = new Block(b.corrupted); + if (b.stored.isStriped()) { + corrupted.setBlockId(b.stored.getBlockId()); + } + corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.reason, b.reasonCode); NumberReplicas numberOfReplicas = countNodes(b.stored); @@ -1283,7 +1289,7 @@ public class BlockManager { if (hasEnoughLiveReplicas || hasMoreCorruptReplicas || corruptedDuringWrite) { // the block is over-replicated so invalidate the replicas immediately - invalidateBlock(b, node); + invalidateBlock(b, node, numberOfReplicas); } else if (namesystem.isPopulatingReplQueues()) { // add the block to neededReplication updateNeededReplications(b.stored, -1, 0); @@ -1295,8 +1301,8 @@ public class BlockManager { * @return true if the block was successfully invalidated and no longer * present in the BlocksMap */ - private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn - ) throws IOException { + private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn, + NumberReplicas nr) throws IOException { blockLog.info("BLOCK* invalidateBlock: {} on {}", b, dn); DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { @@ -1305,7 +1311,6 @@ public class BlockManager { } // Check how many copies we have of the block - NumberReplicas nr = countNodes(b.stored); if (nr.replicasOnStaleNodes() > 0) { blockLog.info("BLOCK* invalidateBlocks: postponing " + "invalidation of {} on {} because {} replica(s) are located on " + @@ -1313,17 +1318,14 @@ public class BlockManager { nr.replicasOnStaleNodes()); postponeBlock(b.corrupted); return false; - } else if (nr.liveReplicas() >= 1) { - // If we have at least one copy on a live node, then we can delete it. + } else { + // we already checked the number of replicas in the caller of this + // function and know there are enough live replicas, so we can delete it. addToInvalidates(b.corrupted, dn); removeStoredBlock(b.stored, node); blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.", b, dn); return true; - } else { - blockLog.info("BLOCK* invalidateBlocks: {} on {} is the only copy and" + - " was not deleted", b, dn); - return false; } } @@ -2782,7 +2784,7 @@ public class BlockManager { " but corrupt replicas map has " + corruptReplicasCount); } if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) { - invalidateCorruptReplicas(storedBlock, reportedBlock); + invalidateCorruptReplicas(storedBlock, reportedBlock, num); } return storedBlock; } @@ -2814,18 +2816,20 @@ public class BlockManager { * * @param blk Block whose corrupt replicas need to be invalidated */ - private void invalidateCorruptReplicas(BlockInfo blk, Block reported) { + private void invalidateCorruptReplicas(BlockInfo blk, Block reported, + NumberReplicas numberReplicas) { Collection nodes = corruptReplicas.getNodes(blk); boolean removedFromBlocksMap = true; if (nodes == null) return; // make a copy of the array of nodes in order to avoid // ConcurrentModificationException, when the block is removed from the node - DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]); + DatanodeDescriptor[] nodesCopy = + nodes.toArray(new DatanodeDescriptor[nodes.size()]); for (DatanodeDescriptor node : nodesCopy) { try { if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null, - Reason.ANY), node)) { + Reason.ANY), node, numberReplicas)) { removedFromBlocksMap = false; } } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 9caf47c73fe..91739200bf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -201,8 +201,8 @@ class BlocksMap { // remove block from the data-node list and the node from the block info boolean removed = node.removeBlock(info); - if (info.getDatanode(0) == null // no datanodes left - && info.isDeleted()) { // does not belong to a file + if (info.hasNoStorage() // no datanodes left + && info.isDeleted()) { // does not belong to a file blocks.remove(b); // remove block from the map } return removed; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java index 3125e2e8aa8..34d6034210f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -24,7 +24,11 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.junit.After; import org.junit.Assert; @@ -234,4 +238,64 @@ public class TestReadStripedFileWithDecoding { fileSize, readLen); Assert.assertArrayEquals(bytes, result.array()); } + + /** + * After reading a corrupted block, make sure the client can correctly report + * the corruption to the NameNode. + */ + @Test + public void testReportBadBlock() throws IOException { + // create file + final Path file = new Path("/corrupted"); + final int length = 10; // length of "corruption" + final byte[] bytes = StripedFileTestUtil.generateBytes(length); + DFSTestUtil.writeFile(fs, file, bytes); + + // corrupt the first data block + int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); + Assert.assertNotEquals(-1, dnIndex); + LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() + .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, + cellSize, dataBlocks, parityBlocks); + // find the first block file + File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); + File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); + Assert.assertTrue("Block file does not exist", blkFile.exists()); + // corrupt the block file + LOG.info("Deliberately corrupting file " + blkFile.getName()); + try (FileOutputStream out = new FileOutputStream(blkFile)) { + out.write("corruption".getBytes()); + } + + // disable the heartbeat from DN so that the corrupted block record is kept + // in NameNode + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + + // do stateful read + ByteBuffer result = ByteBuffer.allocate(length); + ByteBuffer buf = ByteBuffer.allocate(1024); + int readLen = 0; + int ret; + try (FSDataInputStream in = fs.open(file)) { + while ((ret = in.read(buf)) >= 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + } + Assert.assertEquals("The length of file should be the same to write size", + length, readLen); + Assert.assertArrayEquals(bytes, result.array()); + + // check whether the corruption has been reported to the NameNode + final FSNamesystem ns = cluster.getNamesystem(); + final BlockManager bm = ns.getBlockManager(); + BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString()) + .asFile().getBlocks())[0]; + Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java index 7876d1a1d2f..8128772f36f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -325,6 +326,7 @@ public class TestAddStripedBlocks { final int numStripes = 4; final Path filePath = new Path("/corrupt"); final FSNamesystem ns = cluster.getNameNode().getNamesystem(); + final BlockManager bm = ns.getBlockManager(); DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, numStripes, false); @@ -375,7 +377,10 @@ public class TestAddStripedBlocks { ns.processIncrementalBlockReport( cluster.getDataNodes().get(3).getDatanodeId(), reports[0]); BlockManagerTestUtil.updateState(ns.getBlockManager()); - Assert.assertEquals(2, ns.getCorruptReplicaBlocks()); + // the total number of corrupted block info is still 1 + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + // 2 internal blocks corrupted + Assert.assertEquals(2, bm.getCorruptReplicas(stored).size()); // Now change the size of stored block, and test verifying the last // block size @@ -385,9 +390,10 @@ public class TestAddStripedBlocks { reports = DFSTestUtil.makeReportForReceivedBlock(reported, ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); ns.processIncrementalBlockReport( - cluster.getDataNodes().get(3).getDatanodeId(), reports[0]); + cluster.getDataNodes().get(4).getDatanodeId(), reports[0]); BlockManagerTestUtil.updateState(ns.getBlockManager()); - Assert.assertEquals(3, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(3, bm.getCorruptReplicas(stored).size()); // Now send a parity block report with correct size based on adjusted // size of stored block @@ -400,16 +406,18 @@ public class TestAddStripedBlocks { ns.processIncrementalBlockReport( cluster.getDataNodes().get(0).getDatanodeId(), reports[0]); BlockManagerTestUtil.updateState(ns.getBlockManager()); - Assert.assertEquals(3, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(3, bm.getCorruptReplicas(stored).size()); reported.setBlockId(stored.getBlockId() + 1); reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 10); reports = DFSTestUtil.makeReportForReceivedBlock(reported, ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); ns.processIncrementalBlockReport( - cluster.getDataNodes().get(1).getDatanodeId(), reports[0]); + cluster.getDataNodes().get(5).getDatanodeId(), reports[0]); BlockManagerTestUtil.updateState(ns.getBlockManager()); - Assert.assertEquals(3, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(3, bm.getCorruptReplicas(stored).size()); reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS); reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE); @@ -418,7 +426,8 @@ public class TestAddStripedBlocks { ns.processIncrementalBlockReport( cluster.getDataNodes().get(2).getDatanodeId(), reports[0]); BlockManagerTestUtil.updateState(ns.getBlockManager()); - Assert.assertEquals(3, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(3, bm.getCorruptReplicas(stored).size()); } }