HDFS-8163. Using monotonicNow for block report scheduling causes test failures on recently restarted systems. (Arpit Agarwal)
This commit is contained in:
parent
8ddbb8dd43
commit
dfc1c4c303
|
@ -47,6 +47,8 @@ public final class Time {
|
||||||
* milliseconds, and not affected by settimeofday or similar system clock
|
* milliseconds, and not affected by settimeofday or similar system clock
|
||||||
* changes. This is appropriate to use when computing how much longer to
|
* changes. This is appropriate to use when computing how much longer to
|
||||||
* wait for an interval to expire.
|
* wait for an interval to expire.
|
||||||
|
* This function can return a negative value and it must be handled correctly
|
||||||
|
* by callers. See the documentation of System#nanoTime for caveats.
|
||||||
* @return a monotonic clock that counts in milliseconds.
|
* @return a monotonic clock that counts in milliseconds.
|
||||||
*/
|
*/
|
||||||
public static long monotonicNow() {
|
public static long monotonicNow() {
|
||||||
|
|
|
@ -566,6 +566,9 @@ Release 2.7.1 - UNRELEASED
|
||||||
HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor
|
HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor
|
||||||
goes for infinite loop (vinayakumarb)
|
goes for infinite loop (vinayakumarb)
|
||||||
|
|
||||||
|
HDFS-8163. Using monotonicNow for block report scheduling causes
|
||||||
|
test failures on recently restarted systems. (Arpit Agarwal)
|
||||||
|
|
||||||
Release 2.7.0 - 2015-04-20
|
Release 2.7.0 - 2015-04-20
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -429,7 +429,7 @@ class BPOfferService {
|
||||||
*/
|
*/
|
||||||
void scheduleBlockReport(long delay) {
|
void scheduleBlockReport(long delay) {
|
||||||
for (BPServiceActor actor : bpServices) {
|
for (BPServiceActor actor : bpServices) {
|
||||||
actor.scheduleBlockReport(delay);
|
actor.getScheduler().scheduleBlockReport(delay);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.apache.hadoop.util.VersionUtil;
|
import org.apache.hadoop.util.VersionUtil;
|
||||||
|
|
||||||
|
@ -82,19 +83,11 @@ class BPServiceActor implements Runnable {
|
||||||
|
|
||||||
final BPOfferService bpos;
|
final BPOfferService bpos;
|
||||||
|
|
||||||
// lastBlockReport and lastHeartbeat may be assigned/read
|
|
||||||
// by testing threads (through BPServiceActor#triggerXXX), while also
|
|
||||||
// assigned/read by the actor thread. Thus they should be declared as volatile
|
|
||||||
// to make sure the "happens-before" consistency.
|
|
||||||
volatile long lastBlockReport = 0;
|
|
||||||
|
|
||||||
boolean resetBlockReportTime = true;
|
|
||||||
|
|
||||||
volatile long lastCacheReport = 0;
|
volatile long lastCacheReport = 0;
|
||||||
|
private final Scheduler scheduler;
|
||||||
|
|
||||||
Thread bpThread;
|
Thread bpThread;
|
||||||
DatanodeProtocolClientSideTranslatorPB bpNamenode;
|
DatanodeProtocolClientSideTranslatorPB bpNamenode;
|
||||||
private volatile long lastHeartbeat = 0;
|
|
||||||
|
|
||||||
static enum RunningState {
|
static enum RunningState {
|
||||||
CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
|
CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
|
||||||
|
@ -130,6 +123,7 @@ class BPServiceActor implements Runnable {
|
||||||
this.nnAddr = nnAddr;
|
this.nnAddr = nnAddr;
|
||||||
this.dnConf = dn.getDnConf();
|
this.dnConf = dn.getDnConf();
|
||||||
prevBlockReportId = DFSUtil.getRandom().nextLong();
|
prevBlockReportId = DFSUtil.getRandom().nextLong();
|
||||||
|
scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isAlive() {
|
boolean isAlive() {
|
||||||
|
@ -232,33 +226,6 @@ class BPServiceActor implements Runnable {
|
||||||
register(nsInfo);
|
register(nsInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is useful to make sure NN gets Heartbeat before Blockreport
|
|
||||||
// upon NN restart while DN keeps retrying Otherwise,
|
|
||||||
// 1. NN restarts.
|
|
||||||
// 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
|
|
||||||
// 3. After reregistration completes, DN will send Blockreport first.
|
|
||||||
// 4. Given NN receives Blockreport after Heartbeat, it won't mark
|
|
||||||
// DatanodeStorageInfo#blockContentsStale to false until the next
|
|
||||||
// Blockreport.
|
|
||||||
void scheduleHeartbeat() {
|
|
||||||
lastHeartbeat = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This methods arranges for the data node to send the block report at
|
|
||||||
* the next heartbeat.
|
|
||||||
*/
|
|
||||||
void scheduleBlockReport(long delay) {
|
|
||||||
if (delay > 0) { // send BR after random delay
|
|
||||||
lastBlockReport = monotonicNow()
|
|
||||||
- ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
|
|
||||||
} else { // send at next heartbeat
|
|
||||||
lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
|
|
||||||
}
|
|
||||||
resetBlockReportTime = true; // reset future BRs for randomness
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report received blocks and delete hints to the Namenode for each
|
* Report received blocks and delete hints to the Namenode for each
|
||||||
* storage.
|
* storage.
|
||||||
|
@ -387,10 +354,10 @@ class BPServiceActor implements Runnable {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void triggerBlockReportForTests() {
|
void triggerBlockReportForTests() {
|
||||||
synchronized (pendingIncrementalBRperStorage) {
|
synchronized (pendingIncrementalBRperStorage) {
|
||||||
lastBlockReport = 0;
|
scheduler.scheduleHeartbeat();
|
||||||
lastHeartbeat = 0;
|
long nextBlockReportTime = scheduler.scheduleBlockReport(0);
|
||||||
pendingIncrementalBRperStorage.notifyAll();
|
pendingIncrementalBRperStorage.notifyAll();
|
||||||
while (lastBlockReport == 0) {
|
while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) {
|
||||||
try {
|
try {
|
||||||
pendingIncrementalBRperStorage.wait(100);
|
pendingIncrementalBRperStorage.wait(100);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -403,9 +370,9 @@ class BPServiceActor implements Runnable {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void triggerHeartbeatForTests() {
|
void triggerHeartbeatForTests() {
|
||||||
synchronized (pendingIncrementalBRperStorage) {
|
synchronized (pendingIncrementalBRperStorage) {
|
||||||
lastHeartbeat = 0;
|
final long nextHeartbeatTime = scheduler.scheduleHeartbeat();
|
||||||
pendingIncrementalBRperStorage.notifyAll();
|
pendingIncrementalBRperStorage.notifyAll();
|
||||||
while (lastHeartbeat == 0) {
|
while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) {
|
||||||
try {
|
try {
|
||||||
pendingIncrementalBRperStorage.wait(100);
|
pendingIncrementalBRperStorage.wait(100);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -454,8 +421,7 @@ class BPServiceActor implements Runnable {
|
||||||
*/
|
*/
|
||||||
List<DatanodeCommand> blockReport() throws IOException {
|
List<DatanodeCommand> blockReport() throws IOException {
|
||||||
// send block report if timer has expired.
|
// send block report if timer has expired.
|
||||||
final long startTime = monotonicNow();
|
if (!scheduler.isBlockReportDue()) {
|
||||||
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -536,29 +502,10 @@ class BPServiceActor implements Runnable {
|
||||||
(nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
|
(nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
|
||||||
".");
|
".");
|
||||||
}
|
}
|
||||||
scheduleNextBlockReport(startTime);
|
scheduler.scheduleNextBlockReport();
|
||||||
return cmds.size() == 0 ? null : cmds;
|
return cmds.size() == 0 ? null : cmds;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleNextBlockReport(long previousReportStartTime) {
|
|
||||||
// If we have sent the first set of block reports, then wait a random
|
|
||||||
// time before we start the periodic block reports.
|
|
||||||
if (resetBlockReportTime) {
|
|
||||||
lastBlockReport = previousReportStartTime -
|
|
||||||
DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
|
|
||||||
resetBlockReportTime = false;
|
|
||||||
} else {
|
|
||||||
/* say the last block report was at 8:20:14. The current report
|
|
||||||
* should have started around 9:20:14 (default 1 hour interval).
|
|
||||||
* If current time is :
|
|
||||||
* 1) normal like 9:20:18, next report should be at 10:20:14
|
|
||||||
* 2) unexpected like 11:35:43, next report should be at 12:20:14
|
|
||||||
*/
|
|
||||||
lastBlockReport += (monotonicNow() - lastBlockReport) /
|
|
||||||
dnConf.blockReportInterval * dnConf.blockReportInterval;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
DatanodeCommand cacheReport() throws IOException {
|
DatanodeCommand cacheReport() throws IOException {
|
||||||
// If caching is disabled, do not send a cache report
|
// If caching is disabled, do not send a cache report
|
||||||
if (dn.getFSDataset().getCacheCapacity() == 0) {
|
if (dn.getFSDataset().getCacheCapacity() == 0) {
|
||||||
|
@ -686,13 +633,12 @@ class BPServiceActor implements Runnable {
|
||||||
//
|
//
|
||||||
while (shouldRun()) {
|
while (shouldRun()) {
|
||||||
try {
|
try {
|
||||||
final long startTime = monotonicNow();
|
final long startTime = scheduler.monotonicNow();
|
||||||
|
|
||||||
//
|
//
|
||||||
// Every so often, send heartbeat or block-report
|
// Every so often, send heartbeat or block-report
|
||||||
//
|
//
|
||||||
boolean sendHeartbeat =
|
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
|
||||||
startTime - lastHeartbeat >= dnConf.heartBeatInterval;
|
|
||||||
if (sendHeartbeat) {
|
if (sendHeartbeat) {
|
||||||
//
|
//
|
||||||
// All heartbeat messages include following info:
|
// All heartbeat messages include following info:
|
||||||
|
@ -701,11 +647,11 @@ class BPServiceActor implements Runnable {
|
||||||
// -- Total capacity
|
// -- Total capacity
|
||||||
// -- Bytes remaining
|
// -- Bytes remaining
|
||||||
//
|
//
|
||||||
lastHeartbeat = startTime;
|
scheduler.scheduleNextHeartbeat();
|
||||||
if (!dn.areHeartbeatsDisabledForTests()) {
|
if (!dn.areHeartbeatsDisabledForTests()) {
|
||||||
HeartbeatResponse resp = sendHeartBeat();
|
HeartbeatResponse resp = sendHeartBeat();
|
||||||
assert resp != null;
|
assert resp != null;
|
||||||
dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
|
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
|
||||||
|
|
||||||
// If the state of this NN has changed (eg STANDBY->ACTIVE)
|
// If the state of this NN has changed (eg STANDBY->ACTIVE)
|
||||||
// then let the BPOfferService update itself.
|
// then let the BPOfferService update itself.
|
||||||
|
@ -746,8 +692,7 @@ class BPServiceActor implements Runnable {
|
||||||
// There is no work to do; sleep until hearbeat timer elapses,
|
// There is no work to do; sleep until hearbeat timer elapses,
|
||||||
// or work arrives, and then iterate again.
|
// or work arrives, and then iterate again.
|
||||||
//
|
//
|
||||||
long waitTime = dnConf.heartBeatInterval -
|
long waitTime = scheduler.getHeartbeatWaitTime();
|
||||||
(monotonicNow() - lastHeartbeat);
|
|
||||||
synchronized(pendingIncrementalBRperStorage) {
|
synchronized(pendingIncrementalBRperStorage) {
|
||||||
if (waitTime > 0 && !sendImmediateIBR) {
|
if (waitTime > 0 && !sendImmediateIBR) {
|
||||||
try {
|
try {
|
||||||
|
@ -820,7 +765,7 @@ class BPServiceActor implements Runnable {
|
||||||
bpos.registrationSucceeded(this, bpRegistration);
|
bpos.registrationSucceeded(this, bpRegistration);
|
||||||
|
|
||||||
// random short delay - helps scatter the BR from all DNs
|
// random short delay - helps scatter the BR from all DNs
|
||||||
scheduleBlockReport(dnConf.initialBlockReportDelay);
|
scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -935,7 +880,7 @@ class BPServiceActor implements Runnable {
|
||||||
NamespaceInfo nsInfo = retrieveNamespaceInfo();
|
NamespaceInfo nsInfo = retrieveNamespaceInfo();
|
||||||
// and re-register
|
// and re-register
|
||||||
register(nsInfo);
|
register(nsInfo);
|
||||||
scheduleHeartbeat();
|
scheduler.scheduleHeartbeat();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1013,7 +958,7 @@ class BPServiceActor implements Runnable {
|
||||||
} else {
|
} else {
|
||||||
LOG.info(bpos.toString() + ": scheduling a full block report.");
|
LOG.info(bpos.toString() + ": scheduling a full block report.");
|
||||||
synchronized(pendingIncrementalBRperStorage) {
|
synchronized(pendingIncrementalBRperStorage) {
|
||||||
lastBlockReport = 0;
|
scheduler.scheduleBlockReport(0);
|
||||||
pendingIncrementalBRperStorage.notifyAll();
|
pendingIncrementalBRperStorage.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1044,4 +989,116 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Scheduler getScheduler() {
|
||||||
|
return scheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class that wraps the timestamp computations for scheduling
|
||||||
|
* heartbeats and block reports.
|
||||||
|
*/
|
||||||
|
static class Scheduler {
|
||||||
|
// nextBlockReportTime and nextHeartbeatTime may be assigned/read
|
||||||
|
// by testing threads (through BPServiceActor#triggerXXX), while also
|
||||||
|
// assigned/read by the actor thread.
|
||||||
|
@VisibleForTesting
|
||||||
|
volatile long nextBlockReportTime = monotonicNow();
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
volatile long nextHeartbeatTime = monotonicNow();
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean resetBlockReportTime = true;
|
||||||
|
|
||||||
|
private final long heartbeatIntervalMs;
|
||||||
|
private final long blockReportIntervalMs;
|
||||||
|
|
||||||
|
Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
|
||||||
|
this.heartbeatIntervalMs = heartbeatIntervalMs;
|
||||||
|
this.blockReportIntervalMs = blockReportIntervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is useful to make sure NN gets Heartbeat before Blockreport
|
||||||
|
// upon NN restart while DN keeps retrying Otherwise,
|
||||||
|
// 1. NN restarts.
|
||||||
|
// 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
|
||||||
|
// 3. After reregistration completes, DN will send Blockreport first.
|
||||||
|
// 4. Given NN receives Blockreport after Heartbeat, it won't mark
|
||||||
|
// DatanodeStorageInfo#blockContentsStale to false until the next
|
||||||
|
// Blockreport.
|
||||||
|
long scheduleHeartbeat() {
|
||||||
|
nextHeartbeatTime = monotonicNow();
|
||||||
|
return nextHeartbeatTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
long scheduleNextHeartbeat() {
|
||||||
|
// Numerical overflow is possible here and is okay.
|
||||||
|
nextHeartbeatTime += heartbeatIntervalMs;
|
||||||
|
return nextHeartbeatTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isHeartbeatDue(long startTime) {
|
||||||
|
return (nextHeartbeatTime - startTime <= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isBlockReportDue() {
|
||||||
|
return nextBlockReportTime - monotonicNow() <= 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This methods arranges for the data node to send the block report at
|
||||||
|
* the next heartbeat.
|
||||||
|
*/
|
||||||
|
long scheduleBlockReport(long delay) {
|
||||||
|
if (delay > 0) { // send BR after random delay
|
||||||
|
// Numerical overflow is possible here and is okay.
|
||||||
|
nextBlockReportTime =
|
||||||
|
monotonicNow() + DFSUtil.getRandom().nextInt((int) (delay));
|
||||||
|
} else { // send at next heartbeat
|
||||||
|
nextBlockReportTime = monotonicNow();
|
||||||
|
}
|
||||||
|
resetBlockReportTime = true; // reset future BRs for randomness
|
||||||
|
return nextBlockReportTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule the next block report after the block report interval. If the
|
||||||
|
* current block report was delayed then the next block report is sent per
|
||||||
|
* the original schedule.
|
||||||
|
* Numerical overflow is possible here.
|
||||||
|
*/
|
||||||
|
void scheduleNextBlockReport() {
|
||||||
|
// If we have sent the first set of block reports, then wait a random
|
||||||
|
// time before we start the periodic block reports.
|
||||||
|
if (resetBlockReportTime) {
|
||||||
|
nextBlockReportTime = monotonicNow() +
|
||||||
|
DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs));
|
||||||
|
resetBlockReportTime = false;
|
||||||
|
} else {
|
||||||
|
/* say the last block report was at 8:20:14. The current report
|
||||||
|
* should have started around 9:20:14 (default 1 hour interval).
|
||||||
|
* If current time is :
|
||||||
|
* 1) normal like 9:20:18, next report should be at 10:20:14
|
||||||
|
* 2) unexpected like 11:35:43, next report should be at 12:20:14
|
||||||
|
*/
|
||||||
|
nextBlockReportTime +=
|
||||||
|
(((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) /
|
||||||
|
blockReportIntervalMs)) * blockReportIntervalMs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long getHeartbeatWaitTime() {
|
||||||
|
return nextHeartbeatTime - monotonicNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapped for testing.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public long monotonicNow() {
|
||||||
|
return Time.monotonicNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,163 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
|
import static java.lang.Math.abs;
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify the block report and heartbeat scheduling logic of BPServiceActor
|
||||||
|
* using a few different values .
|
||||||
|
*/
|
||||||
|
public class TestBpServiceActorScheduler {
|
||||||
|
protected static final Log LOG = LogFactory.getLog(TestBpServiceActorScheduler.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout timeout = new Timeout(300000);
|
||||||
|
|
||||||
|
private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds
|
||||||
|
private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds
|
||||||
|
private final Random random = new Random(System.nanoTime());
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInit() {
|
||||||
|
for (final long now : getTimestamps()) {
|
||||||
|
Scheduler scheduler = makeMockScheduler(now);
|
||||||
|
assertTrue(scheduler.isHeartbeatDue(now));
|
||||||
|
assertTrue(scheduler.isBlockReportDue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduleBlockReportImmediate() {
|
||||||
|
for (final long now : getTimestamps()) {
|
||||||
|
Scheduler scheduler = makeMockScheduler(now);
|
||||||
|
scheduler.scheduleBlockReport(0);
|
||||||
|
assertTrue(scheduler.resetBlockReportTime);
|
||||||
|
assertThat(scheduler.nextBlockReportTime, is(now));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduleBlockReportDelayed() {
|
||||||
|
for (final long now : getTimestamps()) {
|
||||||
|
Scheduler scheduler = makeMockScheduler(now);
|
||||||
|
final long delayMs = 10;
|
||||||
|
scheduler.scheduleBlockReport(delayMs);
|
||||||
|
assertTrue(scheduler.resetBlockReportTime);
|
||||||
|
assertTrue(scheduler.nextBlockReportTime - now >= 0);
|
||||||
|
assertTrue(scheduler.nextBlockReportTime - (now + delayMs) < 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If resetBlockReportTime is true then the next block report must be scheduled
|
||||||
|
* in the range [now, now + BLOCK_REPORT_INTERVAL_SEC).
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testScheduleNextBlockReport() {
|
||||||
|
for (final long now : getTimestamps()) {
|
||||||
|
Scheduler scheduler = makeMockScheduler(now);
|
||||||
|
assertTrue(scheduler.resetBlockReportTime);
|
||||||
|
scheduler.scheduleNextBlockReport();
|
||||||
|
assertTrue(scheduler.nextBlockReportTime - (now + BLOCK_REPORT_INTERVAL_MS) < 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If resetBlockReportTime is false then the next block report must be scheduled
|
||||||
|
* exactly at (now + BLOCK_REPORT_INTERVAL_SEC).
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testScheduleNextBlockReport2() {
|
||||||
|
for (final long now : getTimestamps()) {
|
||||||
|
Scheduler scheduler = makeMockScheduler(now);
|
||||||
|
scheduler.resetBlockReportTime = false;
|
||||||
|
scheduler.scheduleNextBlockReport();
|
||||||
|
assertThat(scheduler.nextBlockReportTime, is(now + BLOCK_REPORT_INTERVAL_MS));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case when a block report was delayed past its scheduled time.
|
||||||
|
* In that case the next block report should not be delayed for a full interval.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testScheduleNextBlockReport3() {
|
||||||
|
for (final long now : getTimestamps()) {
|
||||||
|
Scheduler scheduler = makeMockScheduler(now);
|
||||||
|
scheduler.resetBlockReportTime = false;
|
||||||
|
|
||||||
|
// Make it look like the block report was scheduled to be sent between 1-3
|
||||||
|
// intervals ago but sent just now.
|
||||||
|
final long blockReportDelay =
|
||||||
|
BLOCK_REPORT_INTERVAL_MS + random.nextInt(2 * (int) BLOCK_REPORT_INTERVAL_MS);
|
||||||
|
final long origBlockReportTime = now - blockReportDelay;
|
||||||
|
scheduler.nextBlockReportTime = origBlockReportTime;
|
||||||
|
scheduler.scheduleNextBlockReport();
|
||||||
|
assertTrue(scheduler.nextBlockReportTime - now < BLOCK_REPORT_INTERVAL_MS);
|
||||||
|
assertTrue(((scheduler.nextBlockReportTime - origBlockReportTime) % BLOCK_REPORT_INTERVAL_MS) == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduleHeartbeat() {
|
||||||
|
for (final long now : getTimestamps()) {
|
||||||
|
Scheduler scheduler = makeMockScheduler(now);
|
||||||
|
scheduler.scheduleNextHeartbeat();
|
||||||
|
assertFalse(scheduler.isHeartbeatDue(now));
|
||||||
|
scheduler.scheduleHeartbeat();
|
||||||
|
assertTrue(scheduler.isHeartbeatDue(now));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Scheduler makeMockScheduler(long now) {
|
||||||
|
LOG.info("Using now = " + now);
|
||||||
|
Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
|
||||||
|
doReturn(now).when(mockScheduler).monotonicNow();
|
||||||
|
mockScheduler.nextBlockReportTime = now;
|
||||||
|
mockScheduler.nextHeartbeatTime = now;
|
||||||
|
return mockScheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Long> getTimestamps() {
|
||||||
|
return Arrays.asList(
|
||||||
|
0L, Long.MIN_VALUE, Long.MAX_VALUE, // test boundaries
|
||||||
|
Long.MAX_VALUE - 1, // test integer overflow
|
||||||
|
abs(random.nextLong()), // positive random
|
||||||
|
-abs(random.nextLong())); // negative random
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue