HDFS-15415. Reduce locking in Datanode DirectoryScanner. Contributed by Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Stephen O'Donnell 2020-09-28 12:43:25 -07:00 committed by Wei-Chiu Chuang
parent 947b0a154a
commit 71d006130e
1 changed files with 68 additions and 72 deletions

View File

@ -42,7 +42,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -406,88 +405,85 @@ public class DirectoryScanner implements Runnable {
clear(); clear();
Map<String, ScanInfo[]> diskReport = getDiskReport(); Map<String, ScanInfo[]> diskReport = getDiskReport();
// Hold FSDataset lock to prevent further changes to the block map for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { String bpid = entry.getKey();
for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) { ScanInfo[] blockpoolReport = entry.getValue();
String bpid = entry.getKey();
ScanInfo[] blockpoolReport = entry.getValue();
Stats statsRecord = new Stats(bpid); Stats statsRecord = new Stats(bpid);
stats.put(bpid, statsRecord); stats.put(bpid, statsRecord);
LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>(); LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
synchronized(diffs) { synchronized(diffs) {
diffs.put(bpid, diffRecord); diffs.put(bpid, diffRecord);
} }
statsRecord.totalBlocks = blockpoolReport.length; statsRecord.totalBlocks = blockpoolReport.length;
final List<ReplicaInfo> bl = dataset.getSortedFinalizedBlocks(bpid); final List<ReplicaInfo> bl = dataset.getSortedFinalizedBlocks(bpid);
int d = 0; // index for blockpoolReport int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot int m = 0; // index for memReprot
while (m < bl.size() && d < blockpoolReport.length) { while (m < bl.size() && d < blockpoolReport.length) {
ReplicaInfo memBlock = bl.get(m); ReplicaInfo memBlock = bl.get(m);
ScanInfo info = blockpoolReport[d]; ScanInfo info = blockpoolReport[d];
if (info.getBlockId() < memBlock.getBlockId()) { if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) { if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
// Block is missing in memory // Block is missing in memory
statsRecord.missingMemoryBlocks++; statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
continue;
}
if (info.getBlockId() > memBlock.getBlockId()) {
// Block is missing on the disk
addDifference(diffRecord, statsRecord,
memBlock.getBlockId(), info.getVolume());
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getVolume().getStorageType() != StorageType.PROVIDED &&
info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (memBlock.compareWith(info) != 0) {
// volumeMap record and on-disk files don't match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info); addDifference(diffRecord, statsRecord, info);
} }
d++; d++;
continue;
}
if (info.getBlockId() > memBlock.getBlockId()) {
// Block is missing on the disk
addDifference(diffRecord, statsRecord,
memBlock.getBlockId(), info.getVolume());
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getVolume().getStorageType() != StorageType.PROVIDED &&
info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (memBlock.compareWith(info) != 0) {
// volumeMap record and on-disk files don't match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
if (d < blockpoolReport.length) { if (d < blockpoolReport.length) {
// There may be multiple on-disk records for the same block, don't increment // There may be multiple on-disk records for the same block,
// the memory record pointer if so. // don't increment the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport[d]; ScanInfo nextInfo = blockpoolReport[d];
if (nextInfo.getBlockId() != info.getBlockId()) { if (nextInfo.getBlockId() != info.getBlockId()) {
++m;
}
} else {
++m; ++m;
} }
} else {
++m;
} }
while (m < bl.size()) { }
ReplicaInfo current = bl.get(m++); while (m < bl.size()) {
addDifference(diffRecord, statsRecord, ReplicaInfo current = bl.get(m++);
current.getBlockId(), current.getVolume()); addDifference(diffRecord, statsRecord,
current.getBlockId(), current.getVolume());
}
while (d < blockpoolReport.length) {
if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) {
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, blockpoolReport[d]);
} }
while (d < blockpoolReport.length) { d++;
if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) { }
statsRecord.missingMemoryBlocks++; LOG.info(statsRecord.toString());
addDifference(diffRecord, statsRecord, blockpoolReport[d]); } //end for
}
d++;
}
LOG.info(statsRecord.toString());
} //end for
} //end synchronized
} }
/** /**