HDFS-8163. Using monotonicNow for block report scheduling causes test failures on recently restarted systems. (Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2015-04-21 10:58:05 -07:00
parent e7bb0fc922
commit 7b3acc5c9d
5 changed files with 299 additions and 74 deletions

View File

@ -47,6 +47,8 @@ public final class Time {
* milliseconds, and not affected by settimeofday or similar system clock * milliseconds, and not affected by settimeofday or similar system clock
* changes. This is appropriate to use when computing how much longer to * changes. This is appropriate to use when computing how much longer to
* wait for an interval to expire. * wait for an interval to expire.
* This function can return a negative value and it must be handled correctly
* by callers. See the documentation of System#nanoTime for caveats.
* @return a monotonic clock that counts in milliseconds. * @return a monotonic clock that counts in milliseconds.
*/ */
public static long monotonicNow() { public static long monotonicNow() {

View File

@ -248,6 +248,9 @@ Release 2.7.1 - UNRELEASED
HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor
goes for infinite loop (vinayakumarb) goes for infinite loop (vinayakumarb)
HDFS-8163. Using monotonicNow for block report scheduling causes
test failures on recently restarted systems. (Arpit Agarwal)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -430,7 +430,7 @@ class BPOfferService {
*/ */
void scheduleBlockReport(long delay) { void scheduleBlockReport(long delay) {
for (BPServiceActor actor : bpServices) { for (BPServiceActor actor : bpServices) {
actor.scheduleBlockReport(delay); actor.getScheduler().scheduleBlockReport(delay);
} }
} }

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.util.VersionUtil;
@ -82,19 +83,11 @@ class BPServiceActor implements Runnable {
final BPOfferService bpos; final BPOfferService bpos;
// lastBlockReport and lastHeartbeat may be assigned/read
// by testing threads (through BPServiceActor#triggerXXX), while also
// assigned/read by the actor thread. Thus they should be declared as volatile
// to make sure the "happens-before" consistency.
volatile long lastBlockReport = 0;
boolean resetBlockReportTime = true;
volatile long lastCacheReport = 0; volatile long lastCacheReport = 0;
private final Scheduler scheduler;
Thread bpThread; Thread bpThread;
DatanodeProtocolClientSideTranslatorPB bpNamenode; DatanodeProtocolClientSideTranslatorPB bpNamenode;
private volatile long lastHeartbeat = 0;
static enum RunningState { static enum RunningState {
CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED; CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
@ -130,6 +123,7 @@ class BPServiceActor implements Runnable {
this.nnAddr = nnAddr; this.nnAddr = nnAddr;
this.dnConf = dn.getDnConf(); this.dnConf = dn.getDnConf();
prevBlockReportId = DFSUtil.getRandom().nextLong(); prevBlockReportId = DFSUtil.getRandom().nextLong();
scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
} }
boolean isAlive() { boolean isAlive() {
@ -232,33 +226,6 @@ class BPServiceActor implements Runnable {
register(nsInfo); register(nsInfo);
} }
// This is useful to make sure NN gets Heartbeat before Blockreport
// upon NN restart while DN keeps retrying Otherwise,
// 1. NN restarts.
// 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
// 3. After reregistration completes, DN will send Blockreport first.
// 4. Given NN receives Blockreport after Heartbeat, it won't mark
// DatanodeStorageInfo#blockContentsStale to false until the next
// Blockreport.
void scheduleHeartbeat() {
lastHeartbeat = 0;
}
/**
* This methods arranges for the data node to send the block report at
* the next heartbeat.
*/
void scheduleBlockReport(long delay) {
if (delay > 0) { // send BR after random delay
lastBlockReport = monotonicNow()
- ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
} else { // send at next heartbeat
lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
}
resetBlockReportTime = true; // reset future BRs for randomness
}
/** /**
* Report received blocks and delete hints to the Namenode for each * Report received blocks and delete hints to the Namenode for each
* storage. * storage.
@ -387,10 +354,10 @@ class BPServiceActor implements Runnable {
@VisibleForTesting @VisibleForTesting
void triggerBlockReportForTests() { void triggerBlockReportForTests() {
synchronized (pendingIncrementalBRperStorage) { synchronized (pendingIncrementalBRperStorage) {
lastBlockReport = 0; scheduler.scheduleHeartbeat();
lastHeartbeat = 0; long nextBlockReportTime = scheduler.scheduleBlockReport(0);
pendingIncrementalBRperStorage.notifyAll(); pendingIncrementalBRperStorage.notifyAll();
while (lastBlockReport == 0) { while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) {
try { try {
pendingIncrementalBRperStorage.wait(100); pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -403,9 +370,9 @@ class BPServiceActor implements Runnable {
@VisibleForTesting @VisibleForTesting
void triggerHeartbeatForTests() { void triggerHeartbeatForTests() {
synchronized (pendingIncrementalBRperStorage) { synchronized (pendingIncrementalBRperStorage) {
lastHeartbeat = 0; final long nextHeartbeatTime = scheduler.scheduleHeartbeat();
pendingIncrementalBRperStorage.notifyAll(); pendingIncrementalBRperStorage.notifyAll();
while (lastHeartbeat == 0) { while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) {
try { try {
pendingIncrementalBRperStorage.wait(100); pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -454,8 +421,7 @@ class BPServiceActor implements Runnable {
*/ */
List<DatanodeCommand> blockReport() throws IOException { List<DatanodeCommand> blockReport() throws IOException {
// send block report if timer has expired. // send block report if timer has expired.
final long startTime = monotonicNow(); if (!scheduler.isBlockReportDue()) {
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
return null; return null;
} }
@ -536,29 +502,10 @@ class BPServiceActor implements Runnable {
(nCmds + " commands: " + Joiner.on("; ").join(cmds)))) + (nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
"."); ".");
} }
scheduleNextBlockReport(startTime); scheduler.scheduleNextBlockReport();
return cmds.size() == 0 ? null : cmds; return cmds.size() == 0 ? null : cmds;
} }
private void scheduleNextBlockReport(long previousReportStartTime) {
// If we have sent the first set of block reports, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
lastBlockReport = previousReportStartTime -
DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
* should have started around 9:20:14 (default 1 hour interval).
* If current time is :
* 1) normal like 9:20:18, next report should be at 10:20:14
* 2) unexpected like 11:35:43, next report should be at 12:20:14
*/
lastBlockReport += (monotonicNow() - lastBlockReport) /
dnConf.blockReportInterval * dnConf.blockReportInterval;
}
}
DatanodeCommand cacheReport() throws IOException { DatanodeCommand cacheReport() throws IOException {
// If caching is disabled, do not send a cache report // If caching is disabled, do not send a cache report
if (dn.getFSDataset().getCacheCapacity() == 0) { if (dn.getFSDataset().getCacheCapacity() == 0) {
@ -686,13 +633,12 @@ class BPServiceActor implements Runnable {
// //
while (shouldRun()) { while (shouldRun()) {
try { try {
final long startTime = monotonicNow(); final long startTime = scheduler.monotonicNow();
// //
// Every so often, send heartbeat or block-report // Every so often, send heartbeat or block-report
// //
boolean sendHeartbeat = final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
startTime - lastHeartbeat >= dnConf.heartBeatInterval;
if (sendHeartbeat) { if (sendHeartbeat) {
// //
// All heartbeat messages include following info: // All heartbeat messages include following info:
@ -701,11 +647,11 @@ class BPServiceActor implements Runnable {
// -- Total capacity // -- Total capacity
// -- Bytes remaining // -- Bytes remaining
// //
lastHeartbeat = startTime; scheduler.scheduleNextHeartbeat();
if (!dn.areHeartbeatsDisabledForTests()) { if (!dn.areHeartbeatsDisabledForTests()) {
HeartbeatResponse resp = sendHeartBeat(); HeartbeatResponse resp = sendHeartBeat();
assert resp != null; assert resp != null;
dn.getMetrics().addHeartbeat(monotonicNow() - startTime); dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE) // If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself. // then let the BPOfferService update itself.
@ -746,8 +692,7 @@ class BPServiceActor implements Runnable {
// There is no work to do; sleep until hearbeat timer elapses, // There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again. // or work arrives, and then iterate again.
// //
long waitTime = dnConf.heartBeatInterval - long waitTime = scheduler.getHeartbeatWaitTime();
(monotonicNow() - lastHeartbeat);
synchronized(pendingIncrementalBRperStorage) { synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && !sendImmediateIBR) { if (waitTime > 0 && !sendImmediateIBR) {
try { try {
@ -820,7 +765,7 @@ class BPServiceActor implements Runnable {
bpos.registrationSucceeded(this, bpRegistration); bpos.registrationSucceeded(this, bpRegistration);
// random short delay - helps scatter the BR from all DNs // random short delay - helps scatter the BR from all DNs
scheduleBlockReport(dnConf.initialBlockReportDelay); scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay);
} }
@ -935,7 +880,7 @@ class BPServiceActor implements Runnable {
NamespaceInfo nsInfo = retrieveNamespaceInfo(); NamespaceInfo nsInfo = retrieveNamespaceInfo();
// and re-register // and re-register
register(nsInfo); register(nsInfo);
scheduleHeartbeat(); scheduler.scheduleHeartbeat();
} }
} }
@ -1013,7 +958,7 @@ class BPServiceActor implements Runnable {
} else { } else {
LOG.info(bpos.toString() + ": scheduling a full block report."); LOG.info(bpos.toString() + ": scheduling a full block report.");
synchronized(pendingIncrementalBRperStorage) { synchronized(pendingIncrementalBRperStorage) {
lastBlockReport = 0; scheduler.scheduleBlockReport(0);
pendingIncrementalBRperStorage.notifyAll(); pendingIncrementalBRperStorage.notifyAll();
} }
} }
@ -1044,4 +989,116 @@ class BPServiceActor implements Runnable {
} }
} }
} }
Scheduler getScheduler() {
return scheduler;
}
/**
* Utility class that wraps the timestamp computations for scheduling
* heartbeats and block reports.
*/
static class Scheduler {
// nextBlockReportTime and nextHeartbeatTime may be assigned/read
// by testing threads (through BPServiceActor#triggerXXX), while also
// assigned/read by the actor thread.
@VisibleForTesting
volatile long nextBlockReportTime = monotonicNow();
@VisibleForTesting
volatile long nextHeartbeatTime = monotonicNow();
@VisibleForTesting
boolean resetBlockReportTime = true;
private final long heartbeatIntervalMs;
private final long blockReportIntervalMs;
Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.blockReportIntervalMs = blockReportIntervalMs;
}
// This is useful to make sure NN gets Heartbeat before Blockreport
// upon NN restart while DN keeps retrying Otherwise,
// 1. NN restarts.
// 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
// 3. After reregistration completes, DN will send Blockreport first.
// 4. Given NN receives Blockreport after Heartbeat, it won't mark
// DatanodeStorageInfo#blockContentsStale to false until the next
// Blockreport.
long scheduleHeartbeat() {
nextHeartbeatTime = monotonicNow();
return nextHeartbeatTime;
}
long scheduleNextHeartbeat() {
// Numerical overflow is possible here and is okay.
nextHeartbeatTime += heartbeatIntervalMs;
return nextHeartbeatTime;
}
boolean isHeartbeatDue(long startTime) {
return (nextHeartbeatTime - startTime <= 0);
}
boolean isBlockReportDue() {
return nextBlockReportTime - monotonicNow() <= 0;
}
/**
* This methods arranges for the data node to send the block report at
* the next heartbeat.
*/
long scheduleBlockReport(long delay) {
if (delay > 0) { // send BR after random delay
// Numerical overflow is possible here and is okay.
nextBlockReportTime =
monotonicNow() + DFSUtil.getRandom().nextInt((int) (delay));
} else { // send at next heartbeat
nextBlockReportTime = monotonicNow();
}
resetBlockReportTime = true; // reset future BRs for randomness
return nextBlockReportTime;
}
/**
* Schedule the next block report after the block report interval. If the
* current block report was delayed then the next block report is sent per
* the original schedule.
* Numerical overflow is possible here.
*/
void scheduleNextBlockReport() {
// If we have sent the first set of block reports, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
nextBlockReportTime = monotonicNow() +
DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
* should have started around 9:20:14 (default 1 hour interval).
* If current time is :
* 1) normal like 9:20:18, next report should be at 10:20:14
* 2) unexpected like 11:35:43, next report should be at 12:20:14
*/
nextBlockReportTime +=
(((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) /
blockReportIntervalMs)) * blockReportIntervalMs;
}
}
long getHeartbeatWaitTime() {
return nextHeartbeatTime - monotonicNow();
}
/**
* Wrapped for testing.
* @return
*/
@VisibleForTesting
public long monotonicNow() {
return Time.monotonicNow();
}
}
} }

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static java.lang.Math.abs;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
* Verify the block report and heartbeat scheduling logic of BPServiceActor
* using a few different values .
*/
public class TestBpServiceActorScheduler {
protected static final Log LOG = LogFactory.getLog(TestBpServiceActorScheduler.class);
@Rule
public Timeout timeout = new Timeout(300000);
private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds
private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds
private final Random random = new Random(System.nanoTime());
@Test
public void testInit() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
assertTrue(scheduler.isHeartbeatDue(now));
assertTrue(scheduler.isBlockReportDue());
}
}
@Test
public void testScheduleBlockReportImmediate() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
scheduler.scheduleBlockReport(0);
assertTrue(scheduler.resetBlockReportTime);
assertThat(scheduler.nextBlockReportTime, is(now));
}
}
@Test
public void testScheduleBlockReportDelayed() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
final long delayMs = 10;
scheduler.scheduleBlockReport(delayMs);
assertTrue(scheduler.resetBlockReportTime);
assertTrue(scheduler.nextBlockReportTime - now >= 0);
assertTrue(scheduler.nextBlockReportTime - (now + delayMs) < 0);
}
}
/**
* If resetBlockReportTime is true then the next block report must be scheduled
* in the range [now, now + BLOCK_REPORT_INTERVAL_SEC).
*/
@Test
public void testScheduleNextBlockReport() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
assertTrue(scheduler.resetBlockReportTime);
scheduler.scheduleNextBlockReport();
assertTrue(scheduler.nextBlockReportTime - (now + BLOCK_REPORT_INTERVAL_MS) < 0);
}
}
/**
* If resetBlockReportTime is false then the next block report must be scheduled
* exactly at (now + BLOCK_REPORT_INTERVAL_SEC).
*/
@Test
public void testScheduleNextBlockReport2() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
scheduler.resetBlockReportTime = false;
scheduler.scheduleNextBlockReport();
assertThat(scheduler.nextBlockReportTime, is(now + BLOCK_REPORT_INTERVAL_MS));
}
}
/**
* Tests the case when a block report was delayed past its scheduled time.
* In that case the next block report should not be delayed for a full interval.
*/
@Test
public void testScheduleNextBlockReport3() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
scheduler.resetBlockReportTime = false;
// Make it look like the block report was scheduled to be sent between 1-3
// intervals ago but sent just now.
final long blockReportDelay =
BLOCK_REPORT_INTERVAL_MS + random.nextInt(2 * (int) BLOCK_REPORT_INTERVAL_MS);
final long origBlockReportTime = now - blockReportDelay;
scheduler.nextBlockReportTime = origBlockReportTime;
scheduler.scheduleNextBlockReport();
assertTrue(scheduler.nextBlockReportTime - now < BLOCK_REPORT_INTERVAL_MS);
assertTrue(((scheduler.nextBlockReportTime - origBlockReportTime) % BLOCK_REPORT_INTERVAL_MS) == 0);
}
}
@Test
public void testScheduleHeartbeat() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
scheduler.scheduleNextHeartbeat();
assertFalse(scheduler.isHeartbeatDue(now));
scheduler.scheduleHeartbeat();
assertTrue(scheduler.isHeartbeatDue(now));
}
}
private Scheduler makeMockScheduler(long now) {
LOG.info("Using now = " + now);
Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.nextBlockReportTime = now;
mockScheduler.nextHeartbeatTime = now;
return mockScheduler;
}
List<Long> getTimestamps() {
return Arrays.asList(
0L, Long.MIN_VALUE, Long.MAX_VALUE, // test boundaries
Long.MAX_VALUE - 1, // test integer overflow
abs(random.nextLong()), // positive random
-abs(random.nextLong())); // negative random
}
}