diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java index b98892332ed..20e2965c0d8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java @@ -47,6 +47,8 @@ public final class Time { * milliseconds, and not affected by settimeofday or similar system clock * changes. This is appropriate to use when computing how much longer to * wait for an interval to expire. + * This function can return a negative value and it must be handled correctly + * by callers. See the documentation of System#nanoTime for caveats. * @return a monotonic clock that counts in milliseconds. */ public static long monotonicNow() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6951a089459..e07e45dc907 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -566,6 +566,9 @@ Release 2.7.1 - UNRELEASED HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor goes for infinite loop (vinayakumarb) + HDFS-8163. Using monotonicNow for block report scheduling causes + test failures on recently restarted systems. (Arpit Agarwal) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 1b42b19af8b..92323f1530b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -429,7 +429,7 @@ class BPOfferService { */ void scheduleBlockReport(long delay) { for (BPServiceActor actor : bpServices) { - actor.scheduleBlockReport(delay); + actor.getScheduler().scheduleBlockReport(delay); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index ba222259b57..5bc505f8ec7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionUtil; @@ -82,19 +83,11 @@ class BPServiceActor implements Runnable { final BPOfferService bpos; - // lastBlockReport and lastHeartbeat may be assigned/read - // by testing threads (through BPServiceActor#triggerXXX), while also - // assigned/read by the actor thread. Thus they should be declared as volatile - // to make sure the "happens-before" consistency. - volatile long lastBlockReport = 0; - - boolean resetBlockReportTime = true; - volatile long lastCacheReport = 0; + private final Scheduler scheduler; Thread bpThread; DatanodeProtocolClientSideTranslatorPB bpNamenode; - private volatile long lastHeartbeat = 0; static enum RunningState { CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED; @@ -130,6 +123,7 @@ class BPServiceActor implements Runnable { this.nnAddr = nnAddr; this.dnConf = dn.getDnConf(); prevBlockReportId = DFSUtil.getRandom().nextLong(); + scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval); } boolean isAlive() { @@ -232,33 +226,6 @@ class BPServiceActor implements Runnable { register(nsInfo); } - // This is useful to make sure NN gets Heartbeat before Blockreport - // upon NN restart while DN keeps retrying Otherwise, - // 1. NN restarts. - // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister. - // 3. After reregistration completes, DN will send Blockreport first. - // 4. Given NN receives Blockreport after Heartbeat, it won't mark - // DatanodeStorageInfo#blockContentsStale to false until the next - // Blockreport. - void scheduleHeartbeat() { - lastHeartbeat = 0; - } - - /** - * This methods arranges for the data node to send the block report at - * the next heartbeat. - */ - void scheduleBlockReport(long delay) { - if (delay > 0) { // send BR after random delay - lastBlockReport = monotonicNow() - - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); - } else { // send at next heartbeat - lastBlockReport = lastHeartbeat - dnConf.blockReportInterval; - } - resetBlockReportTime = true; // reset future BRs for randomness - } - - /** * Report received blocks and delete hints to the Namenode for each * storage. @@ -387,10 +354,10 @@ class BPServiceActor implements Runnable { @VisibleForTesting void triggerBlockReportForTests() { synchronized (pendingIncrementalBRperStorage) { - lastBlockReport = 0; - lastHeartbeat = 0; + scheduler.scheduleHeartbeat(); + long nextBlockReportTime = scheduler.scheduleBlockReport(0); pendingIncrementalBRperStorage.notifyAll(); - while (lastBlockReport == 0) { + while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) { try { pendingIncrementalBRperStorage.wait(100); } catch (InterruptedException e) { @@ -403,9 +370,9 @@ class BPServiceActor implements Runnable { @VisibleForTesting void triggerHeartbeatForTests() { synchronized (pendingIncrementalBRperStorage) { - lastHeartbeat = 0; + final long nextHeartbeatTime = scheduler.scheduleHeartbeat(); pendingIncrementalBRperStorage.notifyAll(); - while (lastHeartbeat == 0) { + while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) { try { pendingIncrementalBRperStorage.wait(100); } catch (InterruptedException e) { @@ -454,8 +421,7 @@ class BPServiceActor implements Runnable { */ List blockReport() throws IOException { // send block report if timer has expired. - final long startTime = monotonicNow(); - if (startTime - lastBlockReport <= dnConf.blockReportInterval) { + if (!scheduler.isBlockReportDue()) { return null; } @@ -536,29 +502,10 @@ class BPServiceActor implements Runnable { (nCmds + " commands: " + Joiner.on("; ").join(cmds)))) + "."); } - scheduleNextBlockReport(startTime); + scheduler.scheduleNextBlockReport(); return cmds.size() == 0 ? null : cmds; } - private void scheduleNextBlockReport(long previousReportStartTime) { - // If we have sent the first set of block reports, then wait a random - // time before we start the periodic block reports. - if (resetBlockReportTime) { - lastBlockReport = previousReportStartTime - - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); - resetBlockReportTime = false; - } else { - /* say the last block report was at 8:20:14. The current report - * should have started around 9:20:14 (default 1 hour interval). - * If current time is : - * 1) normal like 9:20:18, next report should be at 10:20:14 - * 2) unexpected like 11:35:43, next report should be at 12:20:14 - */ - lastBlockReport += (monotonicNow() - lastBlockReport) / - dnConf.blockReportInterval * dnConf.blockReportInterval; - } - } - DatanodeCommand cacheReport() throws IOException { // If caching is disabled, do not send a cache report if (dn.getFSDataset().getCacheCapacity() == 0) { @@ -686,13 +633,12 @@ class BPServiceActor implements Runnable { // while (shouldRun()) { try { - final long startTime = monotonicNow(); + final long startTime = scheduler.monotonicNow(); // // Every so often, send heartbeat or block-report // - boolean sendHeartbeat = - startTime - lastHeartbeat >= dnConf.heartBeatInterval; + final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime); if (sendHeartbeat) { // // All heartbeat messages include following info: @@ -701,11 +647,11 @@ class BPServiceActor implements Runnable { // -- Total capacity // -- Bytes remaining // - lastHeartbeat = startTime; + scheduler.scheduleNextHeartbeat(); if (!dn.areHeartbeatsDisabledForTests()) { HeartbeatResponse resp = sendHeartBeat(); assert resp != null; - dn.getMetrics().addHeartbeat(monotonicNow() - startTime); + dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime); // If the state of this NN has changed (eg STANDBY->ACTIVE) // then let the BPOfferService update itself. @@ -746,8 +692,7 @@ class BPServiceActor implements Runnable { // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // - long waitTime = dnConf.heartBeatInterval - - (monotonicNow() - lastHeartbeat); + long waitTime = scheduler.getHeartbeatWaitTime(); synchronized(pendingIncrementalBRperStorage) { if (waitTime > 0 && !sendImmediateIBR) { try { @@ -820,7 +765,7 @@ class BPServiceActor implements Runnable { bpos.registrationSucceeded(this, bpRegistration); // random short delay - helps scatter the BR from all DNs - scheduleBlockReport(dnConf.initialBlockReportDelay); + scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay); } @@ -935,7 +880,7 @@ class BPServiceActor implements Runnable { NamespaceInfo nsInfo = retrieveNamespaceInfo(); // and re-register register(nsInfo); - scheduleHeartbeat(); + scheduler.scheduleHeartbeat(); } } @@ -1013,7 +958,7 @@ class BPServiceActor implements Runnable { } else { LOG.info(bpos.toString() + ": scheduling a full block report."); synchronized(pendingIncrementalBRperStorage) { - lastBlockReport = 0; + scheduler.scheduleBlockReport(0); pendingIncrementalBRperStorage.notifyAll(); } } @@ -1044,4 +989,116 @@ class BPServiceActor implements Runnable { } } } + + Scheduler getScheduler() { + return scheduler; + } + + /** + * Utility class that wraps the timestamp computations for scheduling + * heartbeats and block reports. + */ + static class Scheduler { + // nextBlockReportTime and nextHeartbeatTime may be assigned/read + // by testing threads (through BPServiceActor#triggerXXX), while also + // assigned/read by the actor thread. + @VisibleForTesting + volatile long nextBlockReportTime = monotonicNow(); + + @VisibleForTesting + volatile long nextHeartbeatTime = monotonicNow(); + + @VisibleForTesting + boolean resetBlockReportTime = true; + + private final long heartbeatIntervalMs; + private final long blockReportIntervalMs; + + Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) { + this.heartbeatIntervalMs = heartbeatIntervalMs; + this.blockReportIntervalMs = blockReportIntervalMs; + } + + // This is useful to make sure NN gets Heartbeat before Blockreport + // upon NN restart while DN keeps retrying Otherwise, + // 1. NN restarts. + // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister. + // 3. After reregistration completes, DN will send Blockreport first. + // 4. Given NN receives Blockreport after Heartbeat, it won't mark + // DatanodeStorageInfo#blockContentsStale to false until the next + // Blockreport. + long scheduleHeartbeat() { + nextHeartbeatTime = monotonicNow(); + return nextHeartbeatTime; + } + + long scheduleNextHeartbeat() { + // Numerical overflow is possible here and is okay. + nextHeartbeatTime += heartbeatIntervalMs; + return nextHeartbeatTime; + } + + boolean isHeartbeatDue(long startTime) { + return (nextHeartbeatTime - startTime <= 0); + } + + boolean isBlockReportDue() { + return nextBlockReportTime - monotonicNow() <= 0; + } + + /** + * This methods arranges for the data node to send the block report at + * the next heartbeat. + */ + long scheduleBlockReport(long delay) { + if (delay > 0) { // send BR after random delay + // Numerical overflow is possible here and is okay. + nextBlockReportTime = + monotonicNow() + DFSUtil.getRandom().nextInt((int) (delay)); + } else { // send at next heartbeat + nextBlockReportTime = monotonicNow(); + } + resetBlockReportTime = true; // reset future BRs for randomness + return nextBlockReportTime; + } + + /** + * Schedule the next block report after the block report interval. If the + * current block report was delayed then the next block report is sent per + * the original schedule. + * Numerical overflow is possible here. + */ + void scheduleNextBlockReport() { + // If we have sent the first set of block reports, then wait a random + // time before we start the periodic block reports. + if (resetBlockReportTime) { + nextBlockReportTime = monotonicNow() + + DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs)); + resetBlockReportTime = false; + } else { + /* say the last block report was at 8:20:14. The current report + * should have started around 9:20:14 (default 1 hour interval). + * If current time is : + * 1) normal like 9:20:18, next report should be at 10:20:14 + * 2) unexpected like 11:35:43, next report should be at 12:20:14 + */ + nextBlockReportTime += + (((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) / + blockReportIntervalMs)) * blockReportIntervalMs; + } + } + + long getHeartbeatWaitTime() { + return nextHeartbeatTime - monotonicNow(); + } + + /** + * Wrapped for testing. + * @return + */ + @VisibleForTesting + public long monotonicNow() { + return Time.monotonicNow(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java new file mode 100644 index 00000000000..0d7484c8667 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import static java.lang.Math.abs; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + + +/** + * Verify the block report and heartbeat scheduling logic of BPServiceActor + * using a few different values . + */ +public class TestBpServiceActorScheduler { + protected static final Log LOG = LogFactory.getLog(TestBpServiceActorScheduler.class); + + @Rule + public Timeout timeout = new Timeout(300000); + + private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds + private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds + private final Random random = new Random(System.nanoTime()); + + @Test + public void testInit() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + assertTrue(scheduler.isHeartbeatDue(now)); + assertTrue(scheduler.isBlockReportDue()); + } + } + + @Test + public void testScheduleBlockReportImmediate() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + scheduler.scheduleBlockReport(0); + assertTrue(scheduler.resetBlockReportTime); + assertThat(scheduler.nextBlockReportTime, is(now)); + } + } + + @Test + public void testScheduleBlockReportDelayed() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + final long delayMs = 10; + scheduler.scheduleBlockReport(delayMs); + assertTrue(scheduler.resetBlockReportTime); + assertTrue(scheduler.nextBlockReportTime - now >= 0); + assertTrue(scheduler.nextBlockReportTime - (now + delayMs) < 0); + } + } + + /** + * If resetBlockReportTime is true then the next block report must be scheduled + * in the range [now, now + BLOCK_REPORT_INTERVAL_SEC). + */ + @Test + public void testScheduleNextBlockReport() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + assertTrue(scheduler.resetBlockReportTime); + scheduler.scheduleNextBlockReport(); + assertTrue(scheduler.nextBlockReportTime - (now + BLOCK_REPORT_INTERVAL_MS) < 0); + } + } + + /** + * If resetBlockReportTime is false then the next block report must be scheduled + * exactly at (now + BLOCK_REPORT_INTERVAL_SEC). + */ + @Test + public void testScheduleNextBlockReport2() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + scheduler.resetBlockReportTime = false; + scheduler.scheduleNextBlockReport(); + assertThat(scheduler.nextBlockReportTime, is(now + BLOCK_REPORT_INTERVAL_MS)); + } + } + + /** + * Tests the case when a block report was delayed past its scheduled time. + * In that case the next block report should not be delayed for a full interval. + */ + @Test + public void testScheduleNextBlockReport3() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + scheduler.resetBlockReportTime = false; + + // Make it look like the block report was scheduled to be sent between 1-3 + // intervals ago but sent just now. + final long blockReportDelay = + BLOCK_REPORT_INTERVAL_MS + random.nextInt(2 * (int) BLOCK_REPORT_INTERVAL_MS); + final long origBlockReportTime = now - blockReportDelay; + scheduler.nextBlockReportTime = origBlockReportTime; + scheduler.scheduleNextBlockReport(); + assertTrue(scheduler.nextBlockReportTime - now < BLOCK_REPORT_INTERVAL_MS); + assertTrue(((scheduler.nextBlockReportTime - origBlockReportTime) % BLOCK_REPORT_INTERVAL_MS) == 0); + } + } + + @Test + public void testScheduleHeartbeat() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + scheduler.scheduleNextHeartbeat(); + assertFalse(scheduler.isHeartbeatDue(now)); + scheduler.scheduleHeartbeat(); + assertTrue(scheduler.isHeartbeatDue(now)); + } + } + + private Scheduler makeMockScheduler(long now) { + LOG.info("Using now = " + now); + Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS)); + doReturn(now).when(mockScheduler).monotonicNow(); + mockScheduler.nextBlockReportTime = now; + mockScheduler.nextHeartbeatTime = now; + return mockScheduler; + } + + List getTimestamps() { + return Arrays.asList( + 0L, Long.MIN_VALUE, Long.MAX_VALUE, // test boundaries + Long.MAX_VALUE - 1, // test integer overflow + abs(random.nextLong()), // positive random + -abs(random.nextLong())); // negative random + } +}