HDFS-10826. Correctly report missing EC blocks in FSCK. Contributed by Takanobu Asanuma.

This commit is contained in:
Jing Zhao 2016-10-05 10:52:50 -07:00
parent d6be1e75d8
commit 8867762256
4 changed files with 193 additions and 7 deletions

View File

@ -1060,7 +1060,8 @@ public class BlockManager implements BlockStatsMXBean {
}
// get block locations
final int numCorruptNodes = countNodes(blk).corruptReplicas();
NumberReplicas numReplicas = countNodes(blk);
final int numCorruptNodes = numReplicas.corruptReplicas();
final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for "
@ -1069,8 +1070,14 @@ public class BlockManager implements BlockStatsMXBean {
}
final int numNodes = blocksMap.numNodes(blk);
final boolean isCorrupt = numCorruptReplicas != 0 &&
numCorruptReplicas == numNodes;
final boolean isCorrupt;
if (blk.isStriped()) {
BlockInfoStriped sblk = (BlockInfoStriped) blk;
isCorrupt = numCorruptReplicas != 0 &&
numReplicas.liveReplicas() < sblk.getRealDataBlockNum();
} else {
isCorrupt = numCorruptReplicas != 0 && numCorruptReplicas == numNodes;
}
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;

View File

@ -727,15 +727,30 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
String blkName = block.toString();
report.append(blockNumber + ". " + blkName + " len=" +
block.getNumBytes());
if (totalReplicasPerBlock == 0 && !isCorrupt) {
boolean isMissing;
if (storedBlock.isStriped()) {
isMissing = totalReplicasPerBlock < minReplication;
} else {
isMissing = totalReplicasPerBlock == 0;
}
if (isMissing && !isCorrupt) {
// If the block is corrupted, it means all its available replicas are
// corrupted. We don't mark it as missing given these available replicas
// might still be accessible as the block might be incorrectly marked as
// corrupted by client machines.
// corrupted in the case of replication, and it means the state of the
// block group is unrecoverable due to some corrupted intenal blocks in
// the case of EC. We don't mark it as missing given these available
// replicas/internal-blocks might still be accessible as the block might
// be incorrectly marked as corrupted by client machines.
report.append(" MISSING!");
res.addMissing(blkName, block.getNumBytes());
missing++;
missize += block.getNumBytes();
if (storedBlock.isStriped()) {
report.append(" Live_repl=" + liveReplicas);
String info = getReplicaInfo(storedBlock);
if (!info.isEmpty()){
report.append(" ").append(info);
}
}
} else {
report.append(" Live_repl=" + liveReplicas);
String info = getReplicaInfo(storedBlock);

View File

@ -166,6 +166,7 @@ public class TestLeaseRecoveryStriped {
// After recovery, storages are reported by primary DN. we should verify
// storages reported by blockReport.
cluster.restartNameNode(true);
cluster.waitFirstBRCompleted(0, 10000);
StripedFileTestUtil.checkData(dfs, p, safeLength,
new ArrayList<DatanodeInfo>(), oldGS);
}

View File

@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -76,6 +77,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -87,6 +89,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -94,12 +97,15 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ReplicationResult;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ErasureCodingResult;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
@ -2079,4 +2085,161 @@ public class TestFsck {
}
}
}
@Test (timeout = 300000)
public void testFsckCorruptECFile() throws Exception {
MiniDFSCluster cluster = null;
DistributedFileSystem fs = null;
try {
Configuration conf = new HdfsConfiguration();
int dataBlocks = ErasureCodingPolicyManager
.getSystemDefaultPolicy().getNumDataUnits();
int parityBlocks = ErasureCodingPolicyManager
.getSystemDefaultPolicy().getNumParityUnits();
int cellSize = ErasureCodingPolicyManager
.getSystemDefaultPolicy().getCellSize();
int totalSize = dataBlocks + parityBlocks;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(totalSize).build();
fs = cluster.getFileSystem();
Map<Integer, Integer> dnIndices = new HashMap<>();
ArrayList<DataNode> dnList = cluster.getDataNodes();
for (int i = 0; i < totalSize; i++) {
dnIndices.put(dnList.get(i).getIpcPort(), i);
}
// create file
Path ecDirPath = new Path("/striped");
fs.mkdir(ecDirPath, FsPermission.getDirDefault());
fs.getClient().setErasureCodingPolicy(ecDirPath.toString(), null);
Path file = new Path(ecDirPath, "corrupted");
final int length = cellSize * dataBlocks;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
LocatedStripedBlock lsb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
cellSize, dataBlocks, parityBlocks);
// make an unrecoverable ec file with corrupted blocks
for(int i = 0; i < parityBlocks + 1; i++) {
int ipcPort = blks[i].getLocations()[0].getIpcPort();
int dnIndex = dnIndices.get(ipcPort);
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir,
blks[i].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
FileOutputStream out = new FileOutputStream(blkFile);
out.write("corruption".getBytes());
}
// disable the heart beat from DN so that the corrupted block record is
// kept in NameNode
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
// Read the file to trigger reportBadBlocks
try {
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,
true);
} catch (IOException ie) {
assertTrue(ie.getMessage().contains(
"missingChunksNum=" + (parityBlocks + 1)));
}
waitForUnrecoverableBlockGroup(conf);
String outStr = runFsck(conf, 1, true, "/");
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
} finally {
if (fs != null) {
try {
fs.close();
} catch (Exception e) {
}
}
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test (timeout = 300000)
public void testFsckMissingECFile() throws Exception {
MiniDFSCluster cluster = null;
DistributedFileSystem fs = null;
try {
Configuration conf = new HdfsConfiguration();
int dataBlocks = ErasureCodingPolicyManager
.getSystemDefaultPolicy().getNumDataUnits();
int parityBlocks = ErasureCodingPolicyManager
.getSystemDefaultPolicy().getNumParityUnits();
int cellSize = ErasureCodingPolicyManager
.getSystemDefaultPolicy().getCellSize();
int totalSize = dataBlocks + parityBlocks;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(totalSize).build();
fs = cluster.getFileSystem();
// create file
Path ecDirPath = new Path("/striped");
fs.mkdir(ecDirPath, FsPermission.getDirDefault());
fs.getClient().setErasureCodingPolicy(ecDirPath.toString(), null);
Path file = new Path(ecDirPath, "missing");
final int length = cellSize * dataBlocks;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
// make an unrecoverable ec file with missing blocks
ArrayList<DataNode> dns = cluster.getDataNodes();
DatanodeID dnId;
for (int i = 0; i < parityBlocks + 1; i++) {
dnId = dns.get(i).getDatanodeId();
cluster.stopDataNode(dnId.getXferAddr());
cluster.setDataNodeDead(dnId);
}
waitForUnrecoverableBlockGroup(conf);
String outStr = runFsck(conf, 1, true, "/", "-files", "-blocks",
"-locations");
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
assertTrue(outStr.contains("Live_repl=" + (dataBlocks - 1)));
} finally {
if (fs != null) {
try {
fs.close();
} catch (Exception e) {
}
}
if (cluster != null) {
cluster.shutdown();
}
}
}
private void waitForUnrecoverableBlockGroup(Configuration conf)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
ToolRunner.run(new DFSck(conf, out), new String[] {"/"});
String outStr = bStream.toString();
if (outStr.contains("UNRECOVERABLE BLOCK GROUPS")) {
return true;
}
} catch (Exception e) {
FSImage.LOG.error("Exception caught", e);
Assert.fail("Caught unexpected exception.");
}
return false;
}
}, 1000, 60000);
}
}