HDFS-9958. BlockManager#createLocatedBlocks can throw NPE for corruptBlocks on failed storages. Contributed by Kuhu Shukla.

This commit is contained in:
Kihwal Lee 2016-04-28 16:39:48 -05:00
parent 7271e91b79
commit f715f14185
2 changed files with 103 additions and 6 deletions

View File

@ -953,8 +953,8 @@ public class BlockManager implements BlockStatsMXBean {
} }
final int numNodes = blocksMap.numNodes(blk); final int numNodes = blocksMap.numNodes(blk);
final boolean isCorrupt = numCorruptNodes == numNodes; final boolean isCorrupt = numCorruptReplicas == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes; final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
int j = 0; int j = 0;
if (numMachines > 0) { if (numMachines > 0) {
@ -1233,10 +1233,22 @@ public class BlockManager implements BlockStatsMXBean {
+ ") does not exist"); + ") does not exist");
} }
DatanodeStorageInfo storage = null;
if (storageID != null) {
storage = node.getStorageInfo(storageID);
}
if (storage == null) {
storage = storedBlock.findStorageInfo(node);
}
if (storage == null) {
blockLog.debug("BLOCK* findAndMarkBlockAsCorrupt: {} not found on {}",
blk, dn);
return;
}
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
storageID == null ? null : node.getStorageInfo(storageID), storage, node);
node);
} }
/** /**

View File

@ -18,15 +18,22 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
@ -36,6 +43,8 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -168,6 +177,82 @@ public class TestFileCorruption {
} }
} }
@Test
public void testCorruptionWithDiskFailure() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
BlockManager bm = cluster.getNamesystem().getBlockManager();
FileSystem fs = cluster.getFileSystem();
final Path FILE_PATH = new Path("/tmp.txt");
final long FILE_LEN = 1L;
DFSTestUtil.createFile(fs, FILE_PATH, FILE_LEN, (short) 3, 1L);
// get the block
final String bpid = cluster.getNamesystem().getBlockPoolId();
File storageDir = cluster.getInstanceStorageDir(0, 0);
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
assertTrue("Data directory does not exist", dataDir.exists());
ExtendedBlock blk = getFirstBlock(cluster.getDataNodes().get(0), bpid);
if (blk == null) {
blk = getFirstBlock(cluster.getDataNodes().get(0), bpid);
}
assertFalse("Data directory does not contain any blocks or there was an" +
" " +
"IO error", blk == null);
ArrayList<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), 3);
FSNamesystem ns = cluster.getNamesystem();
//fail the storage on that node which has the block
try {
ns.writeLock();
updateAllStorages(bm);
} finally {
ns.writeUnlock();
}
ns.writeLock();
try {
markAllBlocksAsCorrupt(bm, blk);
} finally {
ns.writeUnlock();
}
// open the file
fs.open(FILE_PATH);
//clean up
fs.delete(FILE_PATH, false);
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
private void markAllBlocksAsCorrupt(BlockManager bm,
ExtendedBlock blk) throws IOException {
for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) {
bm.findAndMarkBlockAsCorrupt(
blk, info.getDatanodeDescriptor(), info.getStorageID(), "STORAGE_ID");
}
}
private void updateAllStorages(BlockManager bm) {
for (DatanodeDescriptor dd : bm.getDatanodeManager().getDatanodes()) {
Set<DatanodeStorageInfo> setInfos = new HashSet<DatanodeStorageInfo>();
DatanodeStorageInfo[] infos = dd.getStorageInfos();
Random random = new Random();
for (int i = 0; i < infos.length; i++) {
int blkId = random.nextInt(101);
DatanodeStorage storage = new DatanodeStorage(Integer.toString(blkId),
DatanodeStorage.State.FAILED, StorageType.DISK);
infos[i].updateFromStorage(storage);
setInfos.add(infos[i]);
}
}
}
private static ExtendedBlock getFirstBlock(DataNode dn, String bpid) { private static ExtendedBlock getFirstBlock(DataNode dn, String bpid) {
Map<DatanodeStorage, BlockListAsLongs> blockReports = Map<DatanodeStorage, BlockListAsLongs> blockReports =
dn.getFSDataset().getBlockReports(bpid); dn.getFSDataset().getBlockReports(bpid);