diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 746df6908f8..34cfaef3303 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -311,4 +311,9 @@ - + + + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 40a4cb9b6a4..dc941a5ed8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -66,7 +66,7 @@ public class DirectoryScanner implements Runnable { + " starting at %s with interval of %dms"; private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE + " and throttle limit of %dms/s"; - + private static final int RECONCILE_BLOCKS_BATCH_SIZE = 1000; private final FsDatasetSpi dataset; private final ExecutorService reportCompileThreadPool; private final ScheduledExecutorService masterThread; @@ -298,7 +298,9 @@ public class DirectoryScanner implements Runnable { * Clear the current cache of diffs and statistics. */ private void clear() { - diffs.clear(); + synchronized (diffs) { + diffs.clear(); + } stats.clear(); } @@ -371,13 +373,25 @@ public class DirectoryScanner implements Runnable { */ @VisibleForTesting public void reconcile() throws IOException { + LOG.debug("reconcile start DirectoryScanning"); scan(); - for (Entry> entry : diffs.entrySet()) { - String bpid = entry.getKey(); - LinkedList diff = entry.getValue(); - - for (ScanInfo info : diff) { - dataset.checkAndUpdate(bpid, info); + int loopCount = 0; + synchronized (diffs) { + for (Entry> entry : diffs.entrySet()) { + String bpid = entry.getKey(); + LinkedList diff = entry.getValue(); + + for (ScanInfo info : diff) { + dataset.checkAndUpdate(bpid, info); + if (loopCount % RECONCILE_BLOCKS_BATCH_SIZE == 0) { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // do nothing + } + } + loopCount++; + } } } if (!retainDiffs) clear(); @@ -400,7 +414,9 @@ public class DirectoryScanner implements Runnable { Stats statsRecord = new Stats(bpid); stats.put(bpid, statsRecord); LinkedList diffRecord = new LinkedList(); - diffs.put(bpid, diffRecord); + synchronized(diffs) { + diffs.put(bpid, diffRecord); + } statsRecord.totalBlocks = blockpoolReport.length; final List bl = dataset.getSortedFinalizedBlocks(bpid);