HDFS-8215. Refactor NamenodeFsck#check method. Contributed by Takanobu Asanuma

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-04-23 14:19:33 -07:00
parent a6a5d1d6b5
commit f27a976a2f
2 changed files with 107 additions and 60 deletions

View File

@ -146,6 +146,9 @@ Release 2.8.0 - UNRELEASED
HDFS-4448. Allow HA NN to start in secure mode with wildcard address HDFS-4448. Allow HA NN to start in secure mode with wildcard address
configured (atm via asuresh) configured (atm via asuresh)
HDFS-8215. Refactor NamenodeFsck#check method. (Takanobu Asanuma
via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -426,36 +426,8 @@ private void listCorruptFileBlocks() throws IOException {
@VisibleForTesting @VisibleForTesting
void check(String parent, HdfsFileStatus file, Result res) throws IOException { void check(String parent, HdfsFileStatus file, Result res) throws IOException {
String path = file.getFullName(parent); String path = file.getFullName(parent);
boolean isOpen = false;
if (file.isDir()) { if (file.isDir()) {
if (snapshottableDirs != null && snapshottableDirs.contains(path)) { checkDir(path, res);
String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
+ Path.SEPARATOR)
+ HdfsConstants.DOT_SNAPSHOT_DIR;
HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(
snapshotPath);
check(snapshotPath, snapshotFileInfo, res);
}
byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
DirectoryListing thisListing;
if (showFiles) {
out.println(path + " <dir>");
}
res.totalDirs++;
do {
assert lastReturnedName != null;
thisListing = namenode.getRpcServer().getListing(
path, lastReturnedName, false);
if (thisListing == null) {
return;
}
HdfsFileStatus[] files = thisListing.getPartialListing();
for (int i = 0; i < files.length; i++) {
check(path, files[i], res);
}
lastReturnedName = thisListing.getLastName();
} while (thisListing.hasMore());
return; return;
} }
if (file.isSymlink()) { if (file.isSymlink()) {
@ -465,9 +437,47 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
res.totalSymlinks++; res.totalSymlinks++;
return; return;
} }
LocatedBlocks blocks = getBlockLocations(path, file);
if (blocks == null) { // the file is deleted
return;
}
collectFileSummary(path, file, res, blocks);
collectBlocksSummary(parent, file, res, blocks);
}
private void checkDir(String path, Result res) throws IOException {
if (snapshottableDirs != null && snapshottableDirs.contains(path)) {
String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
+ Path.SEPARATOR)
+ HdfsConstants.DOT_SNAPSHOT_DIR;
HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(
snapshotPath);
check(snapshotPath, snapshotFileInfo, res);
}
byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
DirectoryListing thisListing;
if (showFiles) {
out.println(path + " <dir>");
}
res.totalDirs++;
do {
assert lastReturnedName != null;
thisListing = namenode.getRpcServer().getListing(
path, lastReturnedName, false);
if (thisListing == null) {
return;
}
HdfsFileStatus[] files = thisListing.getPartialListing();
for (int i = 0; i < files.length; i++) {
check(path, files[i], res);
}
lastReturnedName = thisListing.getLastName();
} while (thisListing.hasMore());
}
private LocatedBlocks getBlockLocations(String path, HdfsFileStatus file)
throws IOException {
long fileLen = file.getLen(); long fileLen = file.getLen();
// Get block locations without updating the file access time
// and without block access tokens
LocatedBlocks blocks = null; LocatedBlocks blocks = null;
FSNamesystem fsn = namenode.getNamesystem(); FSNamesystem fsn = namenode.getNamesystem();
fsn.readLock(); fsn.readLock();
@ -478,10 +488,13 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
} finally { } finally {
fsn.readUnlock(); fsn.readUnlock();
} }
if (blocks == null) { // the file is deleted return blocks;
return; }
}
isOpen = blocks.isUnderConstruction(); private void collectFileSummary(String path, HdfsFileStatus file, Result res,
LocatedBlocks blocks) throws IOException {
long fileLen = file.getLen();
boolean isOpen = blocks.isUnderConstruction();
if (isOpen && !showOpenFiles) { if (isOpen && !showOpenFiles) {
// We collect these stats about open files to report with default options // We collect these stats about open files to report with default options
res.totalOpenFilesSize += fileLen; res.totalOpenFilesSize += fileLen;
@ -502,57 +515,67 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
out.print('.'); out.print('.');
} }
if (res.totalFiles % 100 == 0) { out.println(); out.flush(); } if (res.totalFiles % 100 == 0) { out.println(); out.flush(); }
}
private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res,
LocatedBlocks blocks) throws IOException {
String path = file.getFullName(parent);
boolean isOpen = blocks.isUnderConstruction();
int missing = 0; int missing = 0;
int corrupt = 0; int corrupt = 0;
long missize = 0; long missize = 0;
int underReplicatedPerFile = 0; int underReplicatedPerFile = 0;
int misReplicatedPerFile = 0; int misReplicatedPerFile = 0;
StringBuilder report = new StringBuilder(); StringBuilder report = new StringBuilder();
int i = 0; int blockNumber = 0;
for (LocatedBlock lBlk : blocks.getLocatedBlocks()) { for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
ExtendedBlock block = lBlk.getBlock(); ExtendedBlock block = lBlk.getBlock();
boolean isCorrupt = lBlk.isCorrupt();
String blkName = block.toString();
BlockManager bm = namenode.getNamesystem().getBlockManager(); BlockManager bm = namenode.getNamesystem().getBlockManager();
// count decommissionedReplicas / decommissioningReplicas
NumberReplicas numberReplicas = bm.countNodes(block.getLocalBlock()); NumberReplicas numberReplicas = bm.countNodes(block.getLocalBlock());
int liveReplicas = numberReplicas.liveReplicas();
int decommissionedReplicas = numberReplicas.decommissioned();; int decommissionedReplicas = numberReplicas.decommissioned();;
int decommissioningReplicas = numberReplicas.decommissioning(); int decommissioningReplicas = numberReplicas.decommissioning();
res.decommissionedReplicas += decommissionedReplicas; res.decommissionedReplicas += decommissionedReplicas;
res.decommissioningReplicas += decommissioningReplicas; res.decommissioningReplicas += decommissioningReplicas;
int totalReplicas = liveReplicas + decommissionedReplicas +
// count total replicas
int liveReplicas = numberReplicas.liveReplicas();
int totalReplicasPerBlock = liveReplicas + decommissionedReplicas +
decommissioningReplicas; decommissioningReplicas;
res.totalReplicas += totalReplicas; res.totalReplicas += totalReplicasPerBlock;
Collection<DatanodeDescriptor> corruptReplicas = null;
if (showReplicaDetails) { // count expected replicas
corruptReplicas = bm.getCorruptReplicas(block.getLocalBlock());
}
short targetFileReplication = file.getReplication(); short targetFileReplication = file.getReplication();
res.numExpectedReplicas += targetFileReplication; res.numExpectedReplicas += targetFileReplication;
if(totalReplicas < minReplication){
// count under min repl'd blocks
if(totalReplicasPerBlock < minReplication){
res.numUnderMinReplicatedBlocks++; res.numUnderMinReplicatedBlocks++;
} }
// count excessive Replicas / over replicated blocks
if (liveReplicas > targetFileReplication) { if (liveReplicas > targetFileReplication) {
res.excessiveReplicas += (liveReplicas - targetFileReplication); res.excessiveReplicas += (liveReplicas - targetFileReplication);
res.numOverReplicatedBlocks += 1; res.numOverReplicatedBlocks += 1;
} }
//keep track of storage tier counts
if (this.showStoragePolcies && lBlk.getStorageTypes() != null) { // count corrupt blocks
StorageType[] storageTypes = lBlk.getStorageTypes(); boolean isCorrupt = lBlk.isCorrupt();
storageTypeSummary.add(Arrays.copyOf(storageTypes, storageTypes.length),
fsn.getBlockManager().getStoragePolicy(file.getStoragePolicy()));
}
// Check if block is Corrupt
if (isCorrupt) { if (isCorrupt) {
corrupt++; corrupt++;
res.corruptBlocks++; res.corruptBlocks++;
out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() + out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() +
" block " + block.getBlockName()+"\n"); " block " + block.getBlockName()+"\n");
} }
if (totalReplicas >= minReplication)
// count minimally replicated blocks
if (totalReplicasPerBlock >= minReplication)
res.numMinReplicatedBlocks++; res.numMinReplicatedBlocks++;
if (totalReplicas < targetFileReplication && totalReplicas > 0) {
res.missingReplicas += (targetFileReplication - totalReplicas); // count missing replicas / under replicated blocks
if (totalReplicasPerBlock < targetFileReplication && totalReplicasPerBlock > 0) {
res.missingReplicas += (targetFileReplication - totalReplicasPerBlock);
res.numUnderReplicatedBlocks += 1; res.numUnderReplicatedBlocks += 1;
underReplicatedPerFile++; underReplicatedPerFile++;
if (!showFiles) { if (!showFiles) {
@ -565,7 +588,8 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
decommissionedReplicas + " decommissioned replica(s) and " + decommissionedReplicas + " decommissioned replica(s) and " +
decommissioningReplicas + " decommissioning replica(s)."); decommissioningReplicas + " decommissioning replica(s).");
} }
// verify block placement policy
// count mis replicated blocks block
BlockPlacementStatus blockPlacementStatus = bpPolicy BlockPlacementStatus blockPlacementStatus = bpPolicy
.verifyBlockPlacement(path, lBlk, targetFileReplication); .verifyBlockPlacement(path, lBlk, targetFileReplication);
if (!blockPlacementStatus.isPlacementPolicySatisfied()) { if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
@ -579,8 +603,16 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
out.println(" Replica placement policy is violated for " + out.println(" Replica placement policy is violated for " +
block + ". " + blockPlacementStatus.getErrorDescription()); block + ". " + blockPlacementStatus.getErrorDescription());
} }
report.append(i + ". " + blkName + " len=" + block.getNumBytes());
if (totalReplicas == 0) { // count storage summary
if (this.showStoragePolcies && lBlk.getStorageTypes() != null) {
countStorageTypeSummary(file, lBlk);
}
// report
String blkName = block.toString();
report.append(blockNumber + ". " + blkName + " len=" + block.getNumBytes());
if (totalReplicasPerBlock == 0) {
report.append(" MISSING!"); report.append(" MISSING!");
res.addMissing(block.toString(), block.getNumBytes()); res.addMissing(block.toString(), block.getNumBytes());
missing++; missing++;
@ -602,6 +634,8 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
if (showReplicaDetails) { if (showReplicaDetails) {
LightWeightLinkedSet<Block> blocksExcess = LightWeightLinkedSet<Block> blocksExcess =
bm.excessReplicateMap.get(dnDesc.getDatanodeUuid()); bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
Collection<DatanodeDescriptor> corruptReplicas =
bm.getCorruptReplicas(block.getLocalBlock());
sb.append("("); sb.append("(");
if (dnDesc.isDecommissioned()) { if (dnDesc.isDecommissioned()) {
sb.append("DECOMMISSIONED)"); sb.append("DECOMMISSIONED)");
@ -628,8 +662,10 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
} }
} }
report.append('\n'); report.append('\n');
i++; blockNumber++;
} }
// count corrupt file & move or delete if necessary
if ((missing > 0) || (corrupt > 0)) { if ((missing > 0) || (corrupt > 0)) {
if (!showFiles && (missing > 0)) { if (!showFiles && (missing > 0)) {
out.print("\n" + path + ": MISSING " + missing out.print("\n" + path + ": MISSING " + missing
@ -643,6 +679,7 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
if (doDelete) deleteCorruptedFile(path); if (doDelete) deleteCorruptedFile(path);
} }
} }
if (showFiles) { if (showFiles) {
if (missing > 0) { if (missing > 0) {
out.print(" MISSING " + missing + " blocks of total size " + missize + " B\n"); out.print(" MISSING " + missing + " blocks of total size " + missize + " B\n");
@ -655,6 +692,13 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
} }
} }
private void countStorageTypeSummary(HdfsFileStatus file, LocatedBlock lBlk) {
StorageType[] storageTypes = lBlk.getStorageTypes();
storageTypeSummary.add(Arrays.copyOf(storageTypes, storageTypes.length),
namenode.getNamesystem().getBlockManager()
.getStoragePolicy(file.getStoragePolicy()));
}
private void deleteCorruptedFile(String path) { private void deleteCorruptedFile(String path) {
try { try {
namenode.getRpcServer().delete(path, true); namenode.getRpcServer().delete(path, true);