diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 284f1e6d95f..f0c6dd14e5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -645,6 +645,9 @@ Release 2.7.0 - UNRELEASED HDFS-7704. DN heartbeat to Active NN may be blocked and expire if connection to Standby NN continues to time out (Rushabh Shah via kihwal) + HDFS-7721. The HDFS BlockScanner may run fast during the first hour + (cmccabe) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index 781b4d39fda..ce0a8755f2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -127,6 +127,11 @@ public class VolumeScanner extends Thread { */ private boolean stopping = false; + /** + * The monotonic minute that the volume scanner was started on. + */ + private long startMinute = 0; + /** * The current minute, in monotonic terms. */ @@ -297,18 +302,18 @@ public class VolumeScanner extends Thread { private void expireOldScannedBytesRecords(long monotonicMs) { long newMinute = TimeUnit.MINUTES.convert(monotonicMs, TimeUnit.MILLISECONDS); - newMinute = newMinute % MINUTES_PER_HOUR; if (curMinute == newMinute) { return; } // If a minute or more has gone past since we last updated the scannedBytes // array, zero out the slots corresponding to those minutes. for (long m = curMinute + 1; m <= newMinute; m++) { - LOG.trace("{}: updateScannedBytes is zeroing out slot {}. " + - "curMinute = {}; newMinute = {}", this, m % MINUTES_PER_HOUR, - curMinute, newMinute); - scannedBytesSum -= scannedBytes[(int)(m % MINUTES_PER_HOUR)]; - scannedBytes[(int)(m % MINUTES_PER_HOUR)] = 0; + int slotIdx = (int)(m % MINUTES_PER_HOUR); + LOG.trace("{}: updateScannedBytes is zeroing out slotIdx {}. " + + "curMinute = {}; newMinute = {}", this, slotIdx, + curMinute, newMinute); + scannedBytesSum -= scannedBytes[slotIdx]; + scannedBytes[slotIdx] = 0; } curMinute = newMinute; } @@ -425,14 +430,28 @@ public class VolumeScanner extends Thread { } @VisibleForTesting - static boolean calculateShouldScan(long targetBytesPerSec, - long scannedBytesSum) { - long effectiveBytesPerSec = - scannedBytesSum / (SECONDS_PER_MINUTE * MINUTES_PER_HOUR); + static boolean calculateShouldScan(String storageId, long targetBytesPerSec, + long scannedBytesSum, long startMinute, long curMinute) { + long runMinutes = curMinute - startMinute; + long effectiveBytesPerSec; + if (runMinutes <= 0) { + // avoid division by zero + effectiveBytesPerSec = scannedBytesSum; + } else { + if (runMinutes > MINUTES_PER_HOUR) { + // we only keep an hour's worth of rate information + runMinutes = MINUTES_PER_HOUR; + } + effectiveBytesPerSec = scannedBytesSum / + (SECONDS_PER_MINUTE * runMinutes); + } + boolean shouldScan = effectiveBytesPerSec <= targetBytesPerSec; - LOG.trace("calculateShouldScan: effectiveBytesPerSec = {}, and " + - "targetBytesPerSec = {}. shouldScan = {}", - effectiveBytesPerSec, targetBytesPerSec, shouldScan); + LOG.trace("{}: calculateShouldScan: effectiveBytesPerSec = {}, and " + + "targetBytesPerSec = {}. startMinute = {}, curMinute = {}, " + + "shouldScan = {}", + storageId, effectiveBytesPerSec, targetBytesPerSec, + startMinute, curMinute, shouldScan); return shouldScan; } @@ -450,7 +469,8 @@ public class VolumeScanner extends Thread { long monotonicMs = Time.monotonicNow(); expireOldScannedBytesRecords(monotonicMs); - if (!calculateShouldScan(conf.targetBytesPerSec, scannedBytesSum)) { + if (!calculateShouldScan(volume.getStorageID(), conf.targetBytesPerSec, + scannedBytesSum, startMinute, curMinute)) { // If neededBytesPerSec is too low, then wait few seconds for some old // scannedBytes records to expire. return 30000L; @@ -533,6 +553,10 @@ public class VolumeScanner extends Thread { @Override public void run() { + // Record the minute on which the scanner started. + this.startMinute = + TimeUnit.MINUTES.convert(Time.monotonicNow(), TimeUnit.MILLISECONDS); + this.curMinute = startMinute; try { LOG.trace("{}: thread starting.", this); resultHandler.setup(this); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java index 7eaa2bf2558..b727263e2a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java @@ -431,13 +431,6 @@ public class TestBlockScanner { info.shouldRun = true; info.notify(); } - Thread.sleep(5000); - synchronized (info) { - long endMs = Time.monotonicNow(); - // Should scan no more than one block a second. - long maxBlocksScanned = ((endMs + 999 - startMs) / 1000); - assertTrue(info.blocksScanned < maxBlocksScanned); - } GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { @@ -446,6 +439,17 @@ public class TestBlockScanner { } } }, 1, 30000); + Thread.sleep(2000); + synchronized (info) { + long endMs = Time.monotonicNow(); + // Should scan no more than one block a second. + long seconds = ((endMs + 999 - startMs) / 1000); + long maxBlocksScanned = seconds * 1; + assertTrue("The number of blocks scanned is too large. Scanned " + + info.blocksScanned + " blocks; only expected to scan at most " + + maxBlocksScanned + " in " + seconds + " seconds.", + info.blocksScanned <= maxBlocksScanned); + } ctx.close(); } @@ -657,24 +661,24 @@ public class TestBlockScanner { public void testCalculateNeededBytesPerSec() throws Exception { // If we didn't check anything the last hour, we should scan now. Assert.assertTrue( - VolumeScanner.calculateShouldScan(100, 0)); + VolumeScanner.calculateShouldScan("test", 100, 0, 0, 60)); // If, on average, we checked 101 bytes/s checked during the last hour, // stop checking now. - Assert.assertFalse( - VolumeScanner.calculateShouldScan(100, 101 * 3600)); + Assert.assertFalse(VolumeScanner. + calculateShouldScan("test", 100, 101 * 3600, 1000, 5000)); // Target is 1 byte / s, but we didn't scan anything in the last minute. // Should scan now. - Assert.assertTrue( - VolumeScanner.calculateShouldScan(1, 3540)); + Assert.assertTrue(VolumeScanner. + calculateShouldScan("test", 1, 3540, 0, 60)); // Target is 1000000 byte / s, but we didn't scan anything in the last // minute. Should scan now. - Assert.assertTrue( - VolumeScanner.calculateShouldScan(100000L, 354000000L)); + Assert.assertTrue(VolumeScanner. + calculateShouldScan("test", 100000L, 354000000L, 0, 60)); - Assert.assertFalse( - VolumeScanner.calculateShouldScan(100000L, 365000000L)); + Assert.assertFalse(VolumeScanner. + calculateShouldScan("test", 100000L, 365000000L, 0, 60)); } }