HDFS-8215. Refactor NamenodeFsck#check method. Contributed by Takanobu Asanuma

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-04-23 14:19:33 -07:00
parent 19262d99eb
commit ef4e9963b2
2 changed files with 107 additions and 60 deletions

View File

@ -464,6 +464,9 @@ Release 2.8.0 - UNRELEASED
HDFS-4448. Allow HA NN to start in secure mode with wildcard address
configured (atm via asuresh)
HDFS-8215. Refactor NamenodeFsck#check method. (Takanobu Asanuma
via szetszwo)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -428,36 +428,8 @@ private void listCorruptFileBlocks() throws IOException {
@VisibleForTesting
void check(String parent, HdfsFileStatus file, Result res) throws IOException {
String path = file.getFullName(parent);
boolean isOpen = false;
if (file.isDir()) {
if (snapshottableDirs != null && snapshottableDirs.contains(path)) {
String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
+ Path.SEPARATOR)
+ HdfsConstants.DOT_SNAPSHOT_DIR;
HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(
snapshotPath);
check(snapshotPath, snapshotFileInfo, res);
}
byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
DirectoryListing thisListing;
if (showFiles) {
out.println(path + " <dir>");
}
res.totalDirs++;
do {
assert lastReturnedName != null;
thisListing = namenode.getRpcServer().getListing(
path, lastReturnedName, false);
if (thisListing == null) {
return;
}
HdfsFileStatus[] files = thisListing.getPartialListing();
for (int i = 0; i < files.length; i++) {
check(path, files[i], res);
}
lastReturnedName = thisListing.getLastName();
} while (thisListing.hasMore());
checkDir(path, res);
return;
}
if (file.isSymlink()) {
@ -467,9 +439,47 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
res.totalSymlinks++;
return;
}
LocatedBlocks blocks = getBlockLocations(path, file);
if (blocks == null) { // the file is deleted
return;
}
collectFileSummary(path, file, res, blocks);
collectBlocksSummary(parent, file, res, blocks);
}
private void checkDir(String path, Result res) throws IOException {
if (snapshottableDirs != null && snapshottableDirs.contains(path)) {
String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
+ Path.SEPARATOR)
+ HdfsConstants.DOT_SNAPSHOT_DIR;
HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(
snapshotPath);
check(snapshotPath, snapshotFileInfo, res);
}
byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
DirectoryListing thisListing;
if (showFiles) {
out.println(path + " <dir>");
}
res.totalDirs++;
do {
assert lastReturnedName != null;
thisListing = namenode.getRpcServer().getListing(
path, lastReturnedName, false);
if (thisListing == null) {
return;
}
HdfsFileStatus[] files = thisListing.getPartialListing();
for (int i = 0; i < files.length; i++) {
check(path, files[i], res);
}
lastReturnedName = thisListing.getLastName();
} while (thisListing.hasMore());
}
private LocatedBlocks getBlockLocations(String path, HdfsFileStatus file)
throws IOException {
long fileLen = file.getLen();
// Get block locations without updating the file access time
// and without block access tokens
LocatedBlocks blocks = null;
FSNamesystem fsn = namenode.getNamesystem();
fsn.readLock();
@ -480,10 +490,13 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
} finally {
fsn.readUnlock();
}
if (blocks == null) { // the file is deleted
return;
}
isOpen = blocks.isUnderConstruction();
return blocks;
}
private void collectFileSummary(String path, HdfsFileStatus file, Result res,
LocatedBlocks blocks) throws IOException {
long fileLen = file.getLen();
boolean isOpen = blocks.isUnderConstruction();
if (isOpen && !showOpenFiles) {
// We collect these stats about open files to report with default options
res.totalOpenFilesSize += fileLen;
@ -507,57 +520,67 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
out.println();
out.flush();
}
}
private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res,
LocatedBlocks blocks) throws IOException {
String path = file.getFullName(parent);
boolean isOpen = blocks.isUnderConstruction();
int missing = 0;
int corrupt = 0;
long missize = 0;
int underReplicatedPerFile = 0;
int misReplicatedPerFile = 0;
StringBuilder report = new StringBuilder();
int i = 0;
int blockNumber = 0;
for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
ExtendedBlock block = lBlk.getBlock();
boolean isCorrupt = lBlk.isCorrupt();
String blkName = block.toString();
BlockManager bm = namenode.getNamesystem().getBlockManager();
// count decommissionedReplicas / decommissioningReplicas
NumberReplicas numberReplicas = bm.countNodes(block.getLocalBlock());
int liveReplicas = numberReplicas.liveReplicas();
int decommissionedReplicas = numberReplicas.decommissioned();;
int decommissioningReplicas = numberReplicas.decommissioning();
res.decommissionedReplicas += decommissionedReplicas;
res.decommissioningReplicas += decommissioningReplicas;
int totalReplicas = liveReplicas + decommissionedReplicas +
// count total replicas
int liveReplicas = numberReplicas.liveReplicas();
int totalReplicasPerBlock = liveReplicas + decommissionedReplicas +
decommissioningReplicas;
res.totalReplicas += totalReplicas;
Collection<DatanodeDescriptor> corruptReplicas = null;
if (showReplicaDetails) {
corruptReplicas = bm.getCorruptReplicas(block.getLocalBlock());
}
res.totalReplicas += totalReplicasPerBlock;
// count expected replicas
short targetFileReplication = file.getReplication();
res.numExpectedReplicas += targetFileReplication;
if(totalReplicas < minReplication){
// count under min repl'd blocks
if(totalReplicasPerBlock < minReplication){
res.numUnderMinReplicatedBlocks++;
}
// count excessive Replicas / over replicated blocks
if (liveReplicas > targetFileReplication) {
res.excessiveReplicas += (liveReplicas - targetFileReplication);
res.numOverReplicatedBlocks += 1;
}
//keep track of storage tier counts
if (this.showStoragePolcies && lBlk.getStorageTypes() != null) {
StorageType[] storageTypes = lBlk.getStorageTypes();
storageTypeSummary.add(Arrays.copyOf(storageTypes, storageTypes.length),
fsn.getBlockManager().getStoragePolicy(file.getStoragePolicy()));
}
// Check if block is Corrupt
// count corrupt blocks
boolean isCorrupt = lBlk.isCorrupt();
if (isCorrupt) {
corrupt++;
res.corruptBlocks++;
out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() +
" block " + block.getBlockName()+"\n");
}
if (totalReplicas >= minReplication)
// count minimally replicated blocks
if (totalReplicasPerBlock >= minReplication)
res.numMinReplicatedBlocks++;
if (totalReplicas < targetFileReplication && totalReplicas > 0) {
res.missingReplicas += (targetFileReplication - totalReplicas);
// count missing replicas / under replicated blocks
if (totalReplicasPerBlock < targetFileReplication && totalReplicasPerBlock > 0) {
res.missingReplicas += (targetFileReplication - totalReplicasPerBlock);
res.numUnderReplicatedBlocks += 1;
underReplicatedPerFile++;
if (!showFiles) {
@ -570,7 +593,8 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
decommissionedReplicas + " decommissioned replica(s) and " +
decommissioningReplicas + " decommissioning replica(s).");
}
// verify block placement policy
// count mis replicated blocks block
BlockPlacementStatus blockPlacementStatus = bpPolicy
.verifyBlockPlacement(path, lBlk, targetFileReplication);
if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
@ -584,8 +608,16 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
out.println(" Replica placement policy is violated for " +
block + ". " + blockPlacementStatus.getErrorDescription());
}
report.append(i + ". " + blkName + " len=" + block.getNumBytes());
if (totalReplicas == 0) {
// count storage summary
if (this.showStoragePolcies && lBlk.getStorageTypes() != null) {
countStorageTypeSummary(file, lBlk);
}
// report
String blkName = block.toString();
report.append(blockNumber + ". " + blkName + " len=" + block.getNumBytes());
if (totalReplicasPerBlock == 0) {
report.append(" MISSING!");
res.addMissing(block.toString(), block.getNumBytes());
missing++;
@ -607,6 +639,8 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
if (showReplicaDetails) {
LightWeightLinkedSet<Block> blocksExcess =
bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
Collection<DatanodeDescriptor> corruptReplicas =
bm.getCorruptReplicas(block.getLocalBlock());
sb.append("(");
if (dnDesc.isDecommissioned()) {
sb.append("DECOMMISSIONED)");
@ -633,8 +667,10 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
}
}
report.append('\n');
i++;
blockNumber++;
}
// count corrupt file & move or delete if necessary
if ((missing > 0) || (corrupt > 0)) {
if (!showFiles && (missing > 0)) {
out.print("\n" + path + ": MISSING " + missing
@ -648,6 +684,7 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
if (doDelete) deleteCorruptedFile(path);
}
}
if (showFiles) {
if (missing > 0) {
out.print(" MISSING " + missing + " blocks of total size " + missize + " B\n");
@ -660,6 +697,13 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
}
}
private void countStorageTypeSummary(HdfsFileStatus file, LocatedBlock lBlk) {
StorageType[] storageTypes = lBlk.getStorageTypes();
storageTypeSummary.add(Arrays.copyOf(storageTypes, storageTypes.length),
namenode.getNamesystem().getBlockManager()
.getStoragePolicy(file.getStoragePolicy()));
}
private void deleteCorruptedFile(String path) {
try {
namenode.getRpcServer().delete(path, true);