diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 10a8cdea433..ad1e4e77e16 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -367,3 +367,6 @@ HDFS-8760. Erasure Coding: reuse BlockReader when reading the same block in pread. (jing9) + + HDFS-8781. Erasure Coding: Correctly handle BlockManager#InvalidateBlocks for + striped block. (Yi Liu via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7872baa22e5..1594a9ad41d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -783,7 +783,10 @@ public class BlockManager { // remove this block from the list of pending blocks to be deleted. for (DatanodeStorageInfo storage : targets) { - invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock); + final Block b = getBlockOnStorage(oldBlock, storage); + if (b != null) { + invalidateBlocks.remove(storage.getDatanodeDescriptor(), b); + } } // Adjust safe-mode totals, since under-construction blocks don't @@ -802,12 +805,14 @@ public class BlockManager { /** * Get all valid locations of the block */ - private List getValidLocations(Block block) { + private List getValidLocations(BlockInfo block) { final List locations = new ArrayList(blocksMap.numNodes(block)); for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { // filter invalidate replicas - if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) { + Block b = getBlockOnStorage(block, storage); + if(b != null && + !invalidateBlocks.contains(storage.getDatanodeDescriptor(), b)) { locations.add(storage); } } @@ -1156,7 +1161,10 @@ public class BlockManager { while(it.hasNext()) { BlockInfo block = it.next(); removeStoredBlock(block, node); - invalidateBlocks.remove(node, block); + final Block b = getBlockOnStorage(block, storageInfo); + if (b != null) { + invalidateBlocks.remove(node, b); + } } namesystem.checkSafeMode(); } @@ -1184,7 +1192,7 @@ public class BlockManager { for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - final Block b = getBlockToInvalidate(storedBlock, storage); + final Block b = getBlockOnStorage(storedBlock, storage); if (b != null) { invalidateBlocks.add(b, node, false); datanodes.append(node).append(" "); @@ -1196,7 +1204,7 @@ public class BlockManager { } } - private Block getBlockToInvalidate(BlockInfo storedBlock, + private Block getBlockOnStorage(BlockInfo storedBlock, DatanodeStorageInfo storage) { return storedBlock.isStriped() ? ((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock; @@ -2054,7 +2062,10 @@ public class BlockManager { // more than one storage on a datanode (and because it's a difficult // assumption to really enforce) removeStoredBlock(block, zombie.getDatanodeDescriptor()); - invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block); + Block b = getBlockOnStorage(block, zombie); + if (b != null) { + invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b); + } } assert(zombie.numBlocks() == 0); LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + @@ -3273,7 +3284,7 @@ public class BlockManager { // should be deleted. Items are removed from the invalidate list // upon giving instructions to the datanodes. // - final Block blockToInvalidate = getBlockToInvalidate(storedBlock, chosen); + final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen); addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor()); blockLog.info("BLOCK* chooseExcessReplicates: " +"({}, {}) is added to invalidated blocks set", chosen, storedBlock); @@ -3838,6 +3849,12 @@ public class BlockManager { return toInvalidate.size(); } + @VisibleForTesting + public boolean containsInvalidateBlock(final DatanodeInfo dn, + final Block block) { + return invalidateBlocks.contains(dn, block); + } + boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) { if (!this.shouldCheckForEnoughRacks) { return true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 47bc7652e09..e4366c91161 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -694,6 +694,13 @@ public class DatanodeDescriptor extends DatanodeInfo { } } + @VisibleForTesting + public boolean containsInvalidateBlock(Block block) { + synchronized (invalidateBlocks) { + return invalidateBlocks.contains(block); + } + } + /** * @return Approximate number of blocks currently scheduled to be written */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java index 34d6034210f..8afea198a7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -22,13 +22,16 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.junit.After; import org.junit.Assert; @@ -274,28 +277,68 @@ public class TestReadStripedFileWithDecoding { DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); } - // do stateful read - ByteBuffer result = ByteBuffer.allocate(length); - ByteBuffer buf = ByteBuffer.allocate(1024); - int readLen = 0; - int ret; - try (FSDataInputStream in = fs.open(file)) { - while ((ret = in.read(buf)) >= 0) { - readLen += ret; - buf.flip(); - result.put(buf); - buf.clear(); + try { + // do stateful read + ByteBuffer result = ByteBuffer.allocate(length); + ByteBuffer buf = ByteBuffer.allocate(1024); + int readLen = 0; + int ret; + try (FSDataInputStream in = fs.open(file)) { + while ((ret = in.read(buf)) >= 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + } + Assert.assertEquals("The length of file should be the same to write size", + length, readLen); + Assert.assertArrayEquals(bytes, result.array()); + + // check whether the corruption has been reported to the NameNode + final FSNamesystem ns = cluster.getNamesystem(); + final BlockManager bm = ns.getBlockManager(); + BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString()) + .asFile().getBlocks())[0]; + Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size()); + } finally { + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); } } - Assert.assertEquals("The length of file should be the same to write size", - length, readLen); - Assert.assertArrayEquals(bytes, result.array()); + } - // check whether the corruption has been reported to the NameNode - final FSNamesystem ns = cluster.getNamesystem(); - final BlockManager bm = ns.getBlockManager(); - BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString()) - .asFile().getBlocks())[0]; - Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size()); + @Test + public void testInvalidateBlock() throws IOException { + final Path file = new Path("/invalidate"); + final int length = 10; + final byte[] bytes = StripedFileTestUtil.generateBytes(length); + DFSTestUtil.writeFile(fs, file, bytes); + + int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); + Assert.assertNotEquals(-1, dnIndex); + LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() + .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, + cellSize, dataBlocks, parityBlocks); + final Block b = blks[0].getBlock().getLocalBlock(); + + DataNode dn = cluster.getDataNodes().get(dnIndex); + // disable the heartbeat from DN so that the invalidated block record is kept + // in NameNode until heartbeat expires and NN mark the dn as dead + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + + try { + // delete the file + fs.delete(file, true); + // check the block is added to invalidateBlocks + final FSNamesystem fsn = cluster.getNamesystem(); + final BlockManager bm = fsn.getBlockManager(); + DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); + Assert.assertTrue(bm.containsInvalidateBlock( + blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b)); + } finally { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); + } } }