HDFS-7768. Change fsck to support EC files. Contributed by Takanobu Asanuma

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-05-24 23:07:34 +08:00 committed by Zhe Zhang
parent 4ae32abdf4
commit 0ed92e5b13
3 changed files with 238 additions and 45 deletions

View File

@ -253,3 +253,5 @@
HDFS-8441. Erasure Coding: make condition check earlier for setReplication.
(waltersu4549)
HDFS-7768. Change fsck to support EC files. (Takanobu Asanuma via szetszwo)

View File

@ -74,6 +74,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
@ -123,6 +124,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
private final int totalDatanodes;
private final InetAddress remoteAddress;
private long totalDirs = 0L;
private long totalSymlinks = 0L;
private String lostFound = null;
private boolean lfInited = false;
private boolean lfInitedOk = false;
@ -356,13 +360,21 @@ public void fsck() {
namenode.getNamesystem().getBlockManager().getStoragePolicies());
}
Result res = new Result(conf);
Result replRes = new ReplicationResult(conf);
Result ecRes = new ErasureCodingResult(conf);
check(path, file, res);
check(path, file, replRes, ecRes);
out.println(res);
out.println(" Number of data-nodes:\t\t" + totalDatanodes);
out.print("\nStatus: ");
out.println(replRes.isHealthy() && ecRes.isHealthy() ? "HEALTHY" : "CORRUPT");
out.println(" Number of data-nodes:\t" + totalDatanodes);
out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
out.println(" Total dirs:\t\t\t" + totalDirs);
out.println(" Total symlinks:\t\t" + totalSymlinks);
out.println("\nReplicated Blocks:");
out.println(replRes);
out.println("\nErasure Coded Block Groups:");
out.println(ecRes);
if (this.showStoragePolcies) {
out.print(storageTypeSummary.toString());
@ -382,7 +394,7 @@ public void fsck() {
// of file system and return appropriate code. Changing the output
// string might break testcases. Also note this must be the last line
// of the report.
if (res.isHealthy()) {
if (replRes.isHealthy() && ecRes.isHealthy()) {
out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS);
} else {
out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS);
@ -425,42 +437,49 @@ private void listCorruptFileBlocks() throws IOException {
}
@VisibleForTesting
void check(String parent, HdfsFileStatus file, Result res) throws IOException {
void check(String parent, HdfsFileStatus file, Result replRes, Result ecRes)
throws IOException {
String path = file.getFullName(parent);
if (file.isDir()) {
checkDir(path, res);
checkDir(path, replRes, ecRes);
return;
}
if (file.isSymlink()) {
if (showFiles) {
out.println(path + " <symlink>");
}
res.totalSymlinks++;
totalSymlinks++;
return;
}
LocatedBlocks blocks = getBlockLocations(path, file);
if (blocks == null) { // the file is deleted
return;
}
collectFileSummary(path, file, res, blocks);
collectBlocksSummary(parent, file, res, blocks);
final Result r = file.getReplication() == 0? ecRes: replRes;
collectFileSummary(path, file, r, blocks);
if (showprogress && (replRes.totalFiles + ecRes.totalFiles) % 100 == 0) {
out.println();
out.flush();
}
collectBlocksSummary(parent, file, r, blocks);
}
private void checkDir(String path, Result res) throws IOException {
private void checkDir(String path, Result replRes, Result ecRes) throws IOException {
if (snapshottableDirs != null && snapshottableDirs.contains(path)) {
String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
+ Path.SEPARATOR)
+ HdfsConstants.DOT_SNAPSHOT_DIR;
HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(
snapshotPath);
check(snapshotPath, snapshotFileInfo, res);
check(snapshotPath, snapshotFileInfo, replRes, ecRes);
}
byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
DirectoryListing thisListing;
if (showFiles) {
out.println(path + " <dir>");
}
res.totalDirs++;
totalDirs++;
do {
assert lastReturnedName != null;
thisListing = namenode.getRpcServer().getListing(
@ -470,7 +489,7 @@ private void checkDir(String path, Result res) throws IOException {
}
HdfsFileStatus[] files = thisListing.getPartialListing();
for (int i = 0; i < files.length; i++) {
check(path, files[i], res);
check(path, files[i], replRes, ecRes);
}
lastReturnedName = thisListing.getLastName();
} while (thisListing.hasMore());
@ -517,10 +536,6 @@ private void collectFileSummary(String path, HdfsFileStatus file, Result res,
} else if (showprogress) {
out.print('.');
}
if ((showprogress) && res.totalFiles % 100 == 0) {
out.println();
out.flush();
}
}
private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res,
@ -543,7 +558,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
block.getLocalBlock());
// count decommissionedReplicas / decommissioningReplicas
NumberReplicas numberReplicas = bm.countNodes(storedBlock);
int decommissionedReplicas = numberReplicas.decommissioned();;
int decommissionedReplicas = numberReplicas.decommissioned();
int decommissioningReplicas = numberReplicas.decommissioning();
res.decommissionedReplicas += decommissionedReplicas;
res.decommissioningReplicas += decommissioningReplicas;
@ -555,7 +570,15 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
res.totalReplicas += totalReplicasPerBlock;
// count expected replicas
short targetFileReplication = file.getReplication();
short targetFileReplication;
if(file.getReplication() == 0) {
INode inode = namenode.getNamesystem().getFSDirectory().getINode(path);
INodesInPath iip = INodesInPath.fromINode(inode);
ECSchema ecSchema = namenode.getNamesystem().getFSDirectory().getECSchema(iip);
targetFileReplication = (short) (ecSchema.getNumDataUnits() + ecSchema.getNumParityUnits());
} else {
targetFileReplication = file.getReplication();
}
res.numExpectedReplicas += targetFileReplication;
// count under min repl'd blocks
@ -981,7 +1004,7 @@ static class Result {
long missingReplicas = 0L;
long decommissionedReplicas = 0L;
long decommissioningReplicas = 0L;
long numUnderMinReplicatedBlocks=0L;
long numUnderMinReplicatedBlocks = 0L;
long numOverReplicatedBlocks = 0L;
long numUnderReplicatedBlocks = 0L;
long numMisReplicatedBlocks = 0L; // blocks that do not satisfy block placement policy
@ -991,20 +1014,14 @@ static class Result {
long totalOpenFilesBlocks = 0L;
long totalFiles = 0L;
long totalOpenFiles = 0L;
long totalDirs = 0L;
long totalSymlinks = 0L;
long totalSize = 0L;
long totalOpenFilesSize = 0L;
long totalReplicas = 0L;
final short replication;
final int minReplication;
Result(Configuration conf) {
this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
Result(int minReplication) {
this.minReplication = minReplication;
}
/**
@ -1032,19 +1049,28 @@ float getReplicationFactor() {
return 0.0f;
return (float) (totalReplicas) / (float) totalBlocks;
}
}
@VisibleForTesting
static class ReplicationResult extends Result {
final short replication;
ReplicationResult(Configuration conf) {
super(conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT));
this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
}
@Override
public String toString() {
StringBuilder res = new StringBuilder();
res.append("Status: ").append((isHealthy() ? "HEALTHY" : "CORRUPT"))
.append("\n Total size:\t").append(totalSize).append(" B");
res.append(" Total size:\t").append(totalSize).append(" B");
if (totalOpenFilesSize != 0) {
res.append(" (Total open files size: ").append(totalOpenFilesSize)
.append(" B)");
}
res.append("\n Total dirs:\t").append(totalDirs).append(
"\n Total files:\t").append(totalFiles);
res.append("\n Total symlinks:\t\t").append(totalSymlinks);
res.append("\n Total files:\t").append(totalFiles);
if (totalOpenFiles != 0) {
res.append(" (Files currently being written: ").append(totalOpenFiles)
.append(")");
@ -1134,4 +1160,116 @@ public String toString() {
return res.toString();
}
}
@VisibleForTesting
static class ErasureCodingResult extends Result {
final String ecSchema;
ErasureCodingResult(Configuration conf) {
this(ErasureCodingSchemaManager.getSystemDefaultSchema());
}
ErasureCodingResult(ECSchema ecSchema) {
super(ecSchema.getNumDataUnits());
this.ecSchema = ecSchema.getSchemaName();
}
@Override
public String toString() {
StringBuilder res = new StringBuilder();
res.append(" Total size:\t").append(totalSize).append(" B");
if (totalOpenFilesSize != 0) {
res.append(" (Total open files size: ").append(totalOpenFilesSize)
.append(" B)");
}
res.append("\n Total files:\t").append(totalFiles);
if (totalOpenFiles != 0) {
res.append(" (Files currently being written: ").append(totalOpenFiles)
.append(")");
}
res.append("\n Total block groups (validated):\t").append(totalBlocks);
if (totalBlocks > 0) {
res.append(" (avg. block group size ").append((totalSize / totalBlocks))
.append(" B)");
}
if (totalOpenFilesBlocks != 0) {
res.append(" (Total open file block groups (not validated): ").append(
totalOpenFilesBlocks).append(")");
}
if (corruptFiles > 0 || numUnderMinReplicatedBlocks > 0) {
res.append("\n ********************************");
if(numUnderMinReplicatedBlocks>0){
res.append("\n UNRECOVERABLE BLOCK GROUPS:\t").append(numUnderMinReplicatedBlocks);
if(totalBlocks>0){
res.append(" (").append(
((float) (numUnderMinReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n ").append("MIN REQUIRED EC BLOCK:\t")
.append(minReplication);
}
if(corruptFiles>0) {
res.append(
"\n CORRUPT FILES:\t").append(corruptFiles);
if (missingSize > 0) {
res.append("\n MISSING BLOCK GROUPS:\t").append(missingIds.size()).append(
"\n MISSING SIZE:\t\t").append(missingSize).append(" B");
}
if (corruptBlocks > 0) {
res.append("\n CORRUPT BLOCK GROUPS: \t").append(corruptBlocks).append(
"\n CORRUPT SIZE:\t\t").append(corruptSize).append(" B");
}
}
res.append("\n ********************************");
}
res.append("\n Minimally erasure-coded block groups:\t").append(
numMinReplicatedBlocks);
if (totalBlocks > 0) {
res.append(" (").append(
((float) (numMinReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n Over-erasure-coded block groups:\t")
.append(numOverReplicatedBlocks);
if (totalBlocks > 0) {
res.append(" (").append(
((float) (numOverReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n Under-erasure-coded block groups:\t").append(
numUnderReplicatedBlocks);
if (totalBlocks > 0) {
res.append(" (").append(
((float) (numUnderReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n Unsatisfactory placement block groups:\t\t")
.append(numMisReplicatedBlocks);
if (totalBlocks > 0) {
res.append(" (").append(
((float) (numMisReplicatedBlocks * 100) / (float) totalBlocks))
.append(" %)");
}
res.append("\n Default schema:\t").append(ecSchema)
.append("\n Average block group size:\t").append(
getReplicationFactor()).append("\n Missing block groups:\t\t").append(
missingIds.size()).append("\n Corrupt block groups:\t\t").append(
corruptBlocks).append("\n Missing ec-blocks:\t\t").append(
missingReplicas);
if (totalReplicas > 0) {
res.append(" (").append(
((float) (missingReplicas * 100) / (float) numExpectedReplicas)).append(
" %)");
}
if (decommissionedReplicas > 0) {
res.append("\n Decommissioned ec-blocks:\t").append(
decommissionedReplicas);
}
if (decommissioningReplicas > 0) {
res.append("\n Decommissioning ec-blocks:\t").append(
decommissioningReplicas);
}
return res.toString();
}
}
}

View File

@ -84,6 +84,8 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ReplicationResult;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ErasureCodingResult;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.io.IOUtils;
@ -1058,13 +1060,14 @@ public void testFsckMissingReplicas() throws IOException {
final HdfsFileStatus file =
namenode.getRpcServer().getFileInfo(pathString);
assertNotNull(file);
Result res = new Result(conf);
fsck.check(pathString, file, res);
Result replRes = new ReplicationResult(conf);
Result ecRes = new ErasureCodingResult(conf);
fsck.check(pathString, file, replRes, ecRes);
// Also print the output from the fsck, for ex post facto sanity checks
System.out.println(result.toString());
assertEquals(res.missingReplicas,
assertEquals(replRes.missingReplicas,
(NUM_BLOCKS*REPL_FACTOR) - (NUM_BLOCKS*NUM_REPLICAS));
assertEquals(res.numExpectedReplicas, NUM_BLOCKS*REPL_FACTOR);
assertEquals(replRes.numExpectedReplicas, NUM_BLOCKS*REPL_FACTOR);
} finally {
if(dfs != null) {
dfs.close();
@ -1135,10 +1138,11 @@ public void testFsckMisPlacedReplicas() throws IOException {
final HdfsFileStatus file =
namenode.getRpcServer().getFileInfo(pathString);
assertNotNull(file);
Result res = new Result(conf);
fsck.check(pathString, file, res);
Result replRes = new ReplicationResult(conf);
Result ecRes = new ErasureCodingResult(conf);
fsck.check(pathString, file, replRes, ecRes);
// check misReplicatedBlock number.
assertEquals(res.numMisReplicatedBlocks, NUM_BLOCKS);
assertEquals(replRes.numMisReplicatedBlocks, NUM_BLOCKS);
} finally {
if(dfs != null) {
dfs.close();
@ -1199,14 +1203,15 @@ public void testFsckFileNotFound() throws Exception {
HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
blockSize, modTime, accessTime, perms, owner, group, symlink,
path, fileId, numChildren, null, storagePolicy, null, 0);
Result res = new Result(conf);
Result replRes = new ReplicationResult(conf);
Result ecRes = new ErasureCodingResult(conf);
try {
fsck.check(pathString, file, res);
fsck.check(pathString, file, replRes, ecRes);
} catch (Exception e) {
fail("Unexpected exception "+ e.getMessage());
}
assertTrue(res.toString().contains("HEALTHY"));
assertTrue(replRes.isHealthy());
}
/** Test fsck with symlinks in the filesystem */
@ -1629,4 +1634,52 @@ public void testFsckWithDecommissionedReplicas() throws Exception {
}
}
}
}
@Test
public void testECFsck() throws Exception {
MiniDFSCluster cluster = null;
FileSystem fs = null;
try {
Configuration conf = new HdfsConfiguration();
final long precision = 1L;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
int totalSize = ErasureCodingSchemaManager.getSystemDefaultSchema().getNumDataUnits()
+ ErasureCodingSchemaManager.getSystemDefaultSchema().getNumParityUnits();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(totalSize).build();
fs = cluster.getFileSystem();
Path replDirPath = new Path("/replicated");
Path replFilePath = new Path(replDirPath, "replfile");
final short factor = 3;
DFSTestUtil.createFile(fs, replFilePath, 1024, factor, 0);
DFSTestUtil.waitReplication(fs, replFilePath, factor);
Path ecDirPath = new Path("/striped");
Path ecFilePath = new Path(ecDirPath, "ecfile");
final int numBlocks = 4;
DFSTestUtil.createStripedFile(cluster, ecFilePath, ecDirPath, numBlocks, 2, true);
long replTime = fs.getFileStatus(replFilePath).getAccessTime();
long ecTime = fs.getFileStatus(ecFilePath).getAccessTime();
Thread.sleep(precision);
setupAuditLogs();
String outStr = runFsck(conf, 0, true, "/");
verifyAuditLogs();
assertEquals(replTime, fs.getFileStatus(replFilePath).getAccessTime());
assertEquals(ecTime, fs.getFileStatus(ecFilePath).getAccessTime());
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
if (fs != null) {try{fs.close();} catch(Exception e){}}
cluster.shutdown();
// restart the cluster; bring up namenode but not the data nodes
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0).format(false).build();
outStr = runFsck(conf, 1, true, "/");
// expect the result is corrupt
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
System.out.println(outStr);
} finally {
if (fs != null) {try{fs.close();} catch(Exception e){}}
if (cluster != null) { cluster.shutdown(); }
}
}
}