HDFS-15583. Backport DirectoryScanner improvements HDFS-14476, HDFS-14751 and HDFS-15048 to branch 3.2 and 3.1. Contributed by Stephen O'Donnell
(cherry picked from commit 5f34e3214e
)
This commit is contained in:
parent
06ff4d1416
commit
27b2c1ea7b
|
@ -311,4 +311,9 @@
|
|||
<Method name="setInteractiveFormat" />
|
||||
<Bug pattern="ME_ENUM_FIELD_SETTER" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.hdfs.server.datanode.DirectoryScanner" />
|
||||
<Method name="reconcile" />
|
||||
<Bug pattern="SWL_SLEEP_WITH_LOCK_HELD" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
|
|
|
@ -66,7 +66,7 @@ public class DirectoryScanner implements Runnable {
|
|||
+ " starting at %s with interval of %dms";
|
||||
private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
|
||||
+ " and throttle limit of %dms/s";
|
||||
|
||||
private static final int RECONCILE_BLOCKS_BATCH_SIZE = 1000;
|
||||
private final FsDatasetSpi<?> dataset;
|
||||
private final ExecutorService reportCompileThreadPool;
|
||||
private final ScheduledExecutorService masterThread;
|
||||
|
@ -298,7 +298,9 @@ public class DirectoryScanner implements Runnable {
|
|||
* Clear the current cache of diffs and statistics.
|
||||
*/
|
||||
private void clear() {
|
||||
diffs.clear();
|
||||
synchronized (diffs) {
|
||||
diffs.clear();
|
||||
}
|
||||
stats.clear();
|
||||
}
|
||||
|
||||
|
@ -371,13 +373,25 @@ public class DirectoryScanner implements Runnable {
|
|||
*/
|
||||
@VisibleForTesting
|
||||
public void reconcile() throws IOException {
|
||||
LOG.debug("reconcile start DirectoryScanning");
|
||||
scan();
|
||||
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
|
||||
String bpid = entry.getKey();
|
||||
LinkedList<ScanInfo> diff = entry.getValue();
|
||||
|
||||
for (ScanInfo info : diff) {
|
||||
dataset.checkAndUpdate(bpid, info);
|
||||
int loopCount = 0;
|
||||
synchronized (diffs) {
|
||||
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
|
||||
String bpid = entry.getKey();
|
||||
LinkedList<ScanInfo> diff = entry.getValue();
|
||||
|
||||
for (ScanInfo info : diff) {
|
||||
dataset.checkAndUpdate(bpid, info);
|
||||
if (loopCount % RECONCILE_BLOCKS_BATCH_SIZE == 0) {
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
loopCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!retainDiffs) clear();
|
||||
|
@ -400,7 +414,9 @@ public class DirectoryScanner implements Runnable {
|
|||
Stats statsRecord = new Stats(bpid);
|
||||
stats.put(bpid, statsRecord);
|
||||
LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
|
||||
diffs.put(bpid, diffRecord);
|
||||
synchronized(diffs) {
|
||||
diffs.put(bpid, diffRecord);
|
||||
}
|
||||
|
||||
statsRecord.totalBlocks = blockpoolReport.length;
|
||||
final List<ReplicaInfo> bl = dataset.getSortedFinalizedBlocks(bpid);
|
||||
|
|
Loading…
Reference in New Issue