From f715f141856cb6a4c6574893f40f9865653b631e Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Thu, 28 Apr 2016 16:39:48 -0500 Subject: [PATCH] HDFS-9958. BlockManager#createLocatedBlocks can throw NPE for corruptBlocks on failed storages. Contributed by Kuhu Shukla. --- .../server/blockmanagement/BlockManager.java | 22 +++-- .../hadoop/hdfs/TestFileCorruption.java | 87 ++++++++++++++++++- 2 files changed, 103 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d811d1df278..58de45e51e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -953,8 +953,8 @@ public class BlockManager implements BlockStatsMXBean { } final int numNodes = blocksMap.numNodes(blk); - final boolean isCorrupt = numCorruptNodes == numNodes; - final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes; + final boolean isCorrupt = numCorruptReplicas == numNodes; + final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas; final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; int j = 0; if (numMachines > 0) { @@ -1232,11 +1232,23 @@ public class BlockManager implements BlockStatsMXBean { + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + ") does not exist"); } - + + DatanodeStorageInfo storage = null; + if (storageID != null) { + storage = node.getStorageInfo(storageID); + } + if (storage == null) { + storage = storedBlock.findStorageInfo(node); + } + + if (storage == null) { + blockLog.debug("BLOCK* findAndMarkBlockAsCorrupt: {} not found on {}", + blk, dn); + return; + } markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), - storageID == null ? null : node.getStorageInfo(storageID), - node); + storage, node); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java index c1a7ebbd99e..011baa1ea3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java @@ -18,15 +18,22 @@ package org.apache.hadoop.hdfs; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; import java.io.FileOutputStream; import java.util.ArrayList; +import java.util.HashSet; import java.util.Map; +import java.util.Random; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; @@ -36,6 +43,8 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -167,7 +176,83 @@ public class TestFileCorruption { } } } - + + @Test + public void testCorruptionWithDiskFailure() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + BlockManager bm = cluster.getNamesystem().getBlockManager(); + FileSystem fs = cluster.getFileSystem(); + final Path FILE_PATH = new Path("/tmp.txt"); + final long FILE_LEN = 1L; + DFSTestUtil.createFile(fs, FILE_PATH, FILE_LEN, (short) 3, 1L); + + // get the block + final String bpid = cluster.getNamesystem().getBlockPoolId(); + File storageDir = cluster.getInstanceStorageDir(0, 0); + File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); + assertTrue("Data directory does not exist", dataDir.exists()); + ExtendedBlock blk = getFirstBlock(cluster.getDataNodes().get(0), bpid); + if (blk == null) { + blk = getFirstBlock(cluster.getDataNodes().get(0), bpid); + } + assertFalse("Data directory does not contain any blocks or there was an" + + " " + + "IO error", blk == null); + ArrayList datanodes = cluster.getDataNodes(); + assertEquals(datanodes.size(), 3); + FSNamesystem ns = cluster.getNamesystem(); + //fail the storage on that node which has the block + try { + ns.writeLock(); + updateAllStorages(bm); + } finally { + ns.writeUnlock(); + } + ns.writeLock(); + try { + markAllBlocksAsCorrupt(bm, blk); + } finally { + ns.writeUnlock(); + } + + // open the file + fs.open(FILE_PATH); + + //clean up + fs.delete(FILE_PATH, false); + } finally { + if (cluster != null) { cluster.shutdown(); } + } + + } + + private void markAllBlocksAsCorrupt(BlockManager bm, + ExtendedBlock blk) throws IOException { + for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) { + bm.findAndMarkBlockAsCorrupt( + blk, info.getDatanodeDescriptor(), info.getStorageID(), "STORAGE_ID"); + } + } + + private void updateAllStorages(BlockManager bm) { + for (DatanodeDescriptor dd : bm.getDatanodeManager().getDatanodes()) { + Set setInfos = new HashSet(); + DatanodeStorageInfo[] infos = dd.getStorageInfos(); + Random random = new Random(); + for (int i = 0; i < infos.length; i++) { + int blkId = random.nextInt(101); + DatanodeStorage storage = new DatanodeStorage(Integer.toString(blkId), + DatanodeStorage.State.FAILED, StorageType.DISK); + infos[i].updateFromStorage(storage); + setInfos.add(infos[i]); + } + } + } + private static ExtendedBlock getFirstBlock(DataNode dn, String bpid) { Map blockReports = dn.getFSDataset().getBlockReports(bpid);