diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 59958fb9981..54624140ac0 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -39,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; @@ -323,10 +324,10 @@ class BPServiceActor implements Runnable { void triggerBlockReportForTests() { synchronized (ibrManager) { scheduler.scheduleHeartbeat(); - long oldBlockReportTime = scheduler.nextBlockReportTime; + long oldBlockReportTime = scheduler.getNextBlockReportTime(); scheduler.forceFullBlockReportNow(); ibrManager.notifyAll(); - while (oldBlockReportTime == scheduler.nextBlockReportTime) { + while (oldBlockReportTime == scheduler.getNextBlockReportTime()) { try { ibrManager.wait(100); } catch (InterruptedException e) { @@ -1163,8 +1164,8 @@ class BPServiceActor implements Runnable { // nextBlockReportTime and nextHeartbeatTime may be assigned/read // by testing threads (through BPServiceActor#triggerXXX), while also // assigned/read by the actor thread. - @VisibleForTesting - volatile long nextBlockReportTime = monotonicNow(); + private final AtomicLong nextBlockReportTime = + new AtomicLong(monotonicNow()); @VisibleForTesting volatile long nextHeartbeatTime = monotonicNow(); @@ -1257,7 +1258,7 @@ class BPServiceActor implements Runnable { } boolean isBlockReportDue(long curTime) { - return nextBlockReportTime - curTime <= 0; + return nextBlockReportTime.get() - curTime <= 0; } boolean isOutliersReportDue(long curTime) { @@ -1281,15 +1282,15 @@ class BPServiceActor implements Runnable { long scheduleBlockReport(long delay, boolean isRegistration) { if (delay > 0) { // send BR after random delay // Numerical overflow is possible here and is okay. - nextBlockReportTime = - monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay)); + nextBlockReportTime.getAndSet( + monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay))); } else { // send at next heartbeat - nextBlockReportTime = monotonicNow(); + nextBlockReportTime.getAndSet(monotonicNow()); } resetBlockReportTime = isRegistration; // reset future BRs for // randomness, post first block report to avoid regular BRs from all // DN's coming at one time. - return nextBlockReportTime; + return nextBlockReportTime.get(); } /** @@ -1302,8 +1303,8 @@ class BPServiceActor implements Runnable { // If we have sent the first set of block reports, then wait a random // time before we start the periodic block reports. if (resetBlockReportTime) { - nextBlockReportTime = monotonicNow() + - ThreadLocalRandom.current().nextInt((int)(blockReportIntervalMs)); + nextBlockReportTime.getAndSet(monotonicNow() + + ThreadLocalRandom.current().nextInt((int) (blockReportIntervalMs))); resetBlockReportTime = false; } else { /* say the last block report was at 8:20:14. The current report @@ -1313,17 +1314,16 @@ class BPServiceActor implements Runnable { * 2) unexpected like 21:35:43, next report should be at 2:20:14 * on the next day. */ - long factor = - (monotonicNow() - nextBlockReportTime + blockReportIntervalMs) - / blockReportIntervalMs; + long factor = (monotonicNow() - nextBlockReportTime.get() + + blockReportIntervalMs) / blockReportIntervalMs; if (factor != 0) { - nextBlockReportTime += factor * blockReportIntervalMs; + nextBlockReportTime.getAndAdd(factor * blockReportIntervalMs); } else { // If the difference between the present time and the scheduled // time is very less, the factor can be 0, so in that case, we can // ignore that negligible time, spent while sending the BRss and // schedule the next BR after the blockReportInterval. - nextBlockReportTime += blockReportIntervalMs; + nextBlockReportTime.getAndAdd(blockReportIntervalMs); } } } @@ -1336,6 +1336,16 @@ class BPServiceActor implements Runnable { return nextLifelineTime - monotonicNow(); } + @VisibleForTesting + long getNextBlockReportTime() { + return nextBlockReportTime.get(); + } + + @VisibleForTesting + void setNextBlockReportTime(long nextBlockReportTime) { + this.nextBlockReportTime.getAndSet(nextBlockReportTime); + } + /** * Wrapped for testing. * @return diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java index 36b6c695c87..b07e9e461f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java @@ -31,6 +31,7 @@ import java.util.Random; import static java.lang.Math.abs; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -70,7 +71,7 @@ public class TestBpServiceActorScheduler { Scheduler scheduler = makeMockScheduler(now); scheduler.scheduleBlockReport(0, true); assertTrue(scheduler.resetBlockReportTime); - assertThat(scheduler.nextBlockReportTime, is(now)); + assertThat(scheduler.getNextBlockReportTime(), is(now)); } } @@ -81,8 +82,8 @@ public class TestBpServiceActorScheduler { final long delayMs = 10; scheduler.scheduleBlockReport(delayMs, true); assertTrue(scheduler.resetBlockReportTime); - assertTrue(scheduler.nextBlockReportTime - now >= 0); - assertTrue(scheduler.nextBlockReportTime - (now + delayMs) < 0); + assertTrue(scheduler.getNextBlockReportTime() - now >= 0); + assertTrue(scheduler.getNextBlockReportTime() - (now + delayMs) < 0); } } @@ -96,7 +97,8 @@ public class TestBpServiceActorScheduler { Scheduler scheduler = makeMockScheduler(now); assertTrue(scheduler.resetBlockReportTime); scheduler.scheduleNextBlockReport(); - assertTrue(scheduler.nextBlockReportTime - (now + BLOCK_REPORT_INTERVAL_MS) < 0); + assertTrue(scheduler.getNextBlockReportTime() + - (now + BLOCK_REPORT_INTERVAL_MS) < 0); } } @@ -110,7 +112,8 @@ public class TestBpServiceActorScheduler { Scheduler scheduler = makeMockScheduler(now); scheduler.resetBlockReportTime = false; scheduler.scheduleNextBlockReport(); - assertThat(scheduler.nextBlockReportTime, is(now + BLOCK_REPORT_INTERVAL_MS)); + assertThat(scheduler.getNextBlockReportTime(), + is(now + BLOCK_REPORT_INTERVAL_MS)); } } @@ -129,10 +132,12 @@ public class TestBpServiceActorScheduler { final long blockReportDelay = BLOCK_REPORT_INTERVAL_MS + random.nextInt(2 * (int) BLOCK_REPORT_INTERVAL_MS); final long origBlockReportTime = now - blockReportDelay; - scheduler.nextBlockReportTime = origBlockReportTime; + scheduler.setNextBlockReportTime(origBlockReportTime); scheduler.scheduleNextBlockReport(); - assertTrue(scheduler.nextBlockReportTime - now < BLOCK_REPORT_INTERVAL_MS); - assertTrue(((scheduler.nextBlockReportTime - origBlockReportTime) % BLOCK_REPORT_INTERVAL_MS) == 0); + assertTrue((scheduler.getNextBlockReportTime() - now) + < BLOCK_REPORT_INTERVAL_MS); + assertEquals(0, ((scheduler.getNextBlockReportTime() - origBlockReportTime) + % BLOCK_REPORT_INTERVAL_MS)); } } @@ -201,7 +206,7 @@ public class TestBpServiceActorScheduler { HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS)); doReturn(now).when(mockScheduler).monotonicNow(); - mockScheduler.nextBlockReportTime = now; + mockScheduler.setNextBlockReportTime(now); mockScheduler.nextHeartbeatTime = now; mockScheduler.nextOutliersReportTime = now; return mockScheduler;