HBASE-21164 reportForDuty should do backoff rather than retry
Remove unused methods from Sleeper (its ok, its @Private). Remove notion of startTime from Sleeper handling (it is is unused). Allow passing in how long to sleep so can maintain externally. In HRS, use a RetryCounter to calculate backoff sleep time for when reportForDuty is failing against a struggling Master. Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java
This commit is contained in:
parent
47e42dab75
commit
f4af3881c1
|
@ -174,4 +174,14 @@ public class RetryCounter {
|
||||||
public int getAttemptTimes() {
|
public int getAttemptTimes() {
|
||||||
return attempts;
|
return attempts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getBackoffTime() {
|
||||||
|
return this.retryConfig.backoffPolicy.getBackoffTime(this.retryConfig, getAttemptTimes());
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getBackoffTimeAndIncrementAttempts() {
|
||||||
|
long backoffTime = getBackoffTime();
|
||||||
|
useRetry();
|
||||||
|
return backoffTime;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,13 +49,6 @@ public class Sleeper {
|
||||||
this.stopper = stopper;
|
this.stopper = stopper;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Sleep for period.
|
|
||||||
*/
|
|
||||||
public void sleep() {
|
|
||||||
sleep(System.currentTimeMillis());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If currently asleep, stops sleeping; if not asleep, will skip the next
|
* If currently asleep, stops sleeping; if not asleep, will skip the next
|
||||||
* sleep cycle.
|
* sleep cycle.
|
||||||
|
@ -68,28 +61,24 @@ public class Sleeper {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sleep for period adjusted by passed <code>startTime</code>
|
* Sleep for period.
|
||||||
* @param startTime Time some task started previous to now. Time to sleep
|
|
||||||
* will be docked current time minus passed <code>startTime</code>.
|
|
||||||
*/
|
*/
|
||||||
public void sleep(final long startTime) {
|
public void sleep() {
|
||||||
|
sleep(this.period);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sleep(long sleepTime) {
|
||||||
if (this.stopper.isStopped()) {
|
if (this.stopper.isStopped()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
long waitTime = this.period - (now - startTime);
|
long currentSleepTime = sleepTime;
|
||||||
if (waitTime > this.period) {
|
while (currentSleepTime > 0) {
|
||||||
LOG.warn("Calculated wait time > " + this.period +
|
|
||||||
"; setting to this.period: " + System.currentTimeMillis() + ", " +
|
|
||||||
startTime);
|
|
||||||
waitTime = this.period;
|
|
||||||
}
|
|
||||||
while (waitTime > 0) {
|
|
||||||
long woke = -1;
|
long woke = -1;
|
||||||
try {
|
try {
|
||||||
synchronized (sleepLock) {
|
synchronized (sleepLock) {
|
||||||
if (triggerWake) break;
|
if (triggerWake) break;
|
||||||
sleepLock.wait(waitTime);
|
sleepLock.wait(currentSleepTime);
|
||||||
}
|
}
|
||||||
woke = System.currentTimeMillis();
|
woke = System.currentTimeMillis();
|
||||||
long slept = woke - now;
|
long slept = woke - now;
|
||||||
|
@ -108,7 +97,7 @@ public class Sleeper {
|
||||||
}
|
}
|
||||||
// Recalculate waitTime.
|
// Recalculate waitTime.
|
||||||
woke = (woke == -1)? System.currentTimeMillis(): woke;
|
woke = (woke == -1)? System.currentTimeMillis(): woke;
|
||||||
waitTime = this.period - (woke - startTime);
|
currentSleepTime = this.period - (woke - now);
|
||||||
}
|
}
|
||||||
synchronized(sleepLock) {
|
synchronized(sleepLock) {
|
||||||
triggerWake = false;
|
triggerWake = false;
|
||||||
|
|
|
@ -2649,7 +2649,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
stop("Stopped by " + Thread.currentThread().getName());
|
stop("Stopped by " + Thread.currentThread().getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
void checkServiceStarted() throws ServerNotRunningYetException {
|
@VisibleForTesting
|
||||||
|
protected void checkServiceStarted() throws ServerNotRunningYetException {
|
||||||
if (!serviceStarted) {
|
if (!serviceStarted) {
|
||||||
throw new ServerNotRunningYetException("Server is not running yet");
|
throw new ServerNotRunningYetException("Server is not running yet");
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,6 +182,8 @@ import org.apache.hadoop.hbase.util.HasThread;
|
||||||
import org.apache.hadoop.hbase.util.JSONBean;
|
import org.apache.hadoop.hbase.util.JSONBean;
|
||||||
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
|
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
|
||||||
import org.apache.hadoop.hbase.util.MBeanUtil;
|
import org.apache.hadoop.hbase.util.MBeanUtil;
|
||||||
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
|
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||||
import org.apache.hadoop.hbase.util.Sleeper;
|
import org.apache.hadoop.hbase.util.Sleeper;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
@ -972,13 +974,18 @@ public class HRegionServer extends HasThread implements
|
||||||
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
|
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try and register with the Master; tell it we are here. Break if
|
// Try and register with the Master; tell it we are here. Break if server is stopped or the
|
||||||
// server is stopped or the clusterup flag is down or hdfs went wacky.
|
// clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and start
|
||||||
|
// up all Services. Use RetryCounter to get backoff in case Master is struggling to come up.
|
||||||
|
RetryCounterFactory rcf = new RetryCounterFactory(Integer.MAX_VALUE,
|
||||||
|
this.sleeper.getPeriod(), 1000 * 60 * 5);
|
||||||
|
RetryCounter rc = rcf.create();
|
||||||
while (keepLooping()) {
|
while (keepLooping()) {
|
||||||
RegionServerStartupResponse w = reportForDuty();
|
RegionServerStartupResponse w = reportForDuty();
|
||||||
if (w == null) {
|
if (w == null) {
|
||||||
LOG.warn("reportForDuty failed; sleeping and then retrying.");
|
long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
|
||||||
this.sleeper.sleep();
|
LOG.warn("reportForDuty failed; sleeping " + sleepTime + " ms and then retrying");
|
||||||
|
this.sleeper.sleep(sleepTime);
|
||||||
} else {
|
} else {
|
||||||
handleReportForDutyResponse(w);
|
handleReportForDutyResponse(w);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -21,7 +21,9 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.StringWriter;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -30,12 +32,19 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.LocalHBaseCluster;
|
import org.apache.hadoop.hbase.LocalHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
|
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
import org.apache.log4j.Appender;
|
||||||
|
import org.apache.log4j.Layout;
|
||||||
|
import org.apache.log4j.PatternLayout;
|
||||||
|
import org.apache.log4j.WriterAppender;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -72,6 +81,86 @@ public class TestRegionServerReportForDuty {
|
||||||
testUtil.shutdownMiniDFSCluster();
|
testUtil.shutdownMiniDFSCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* LogCapturer is similar to {@link org.apache.hadoop.test.GenericTestUtils.LogCapturer}
|
||||||
|
* except that this implementation has a default appender to the root logger.
|
||||||
|
* Hadoop 2.8+ supports the default appender in the LogCapture it ships and this can be replaced.
|
||||||
|
* TODO: This class can be removed after we upgrade Hadoop dependency.
|
||||||
|
*/
|
||||||
|
static class LogCapturer {
|
||||||
|
private StringWriter sw = new StringWriter();
|
||||||
|
private WriterAppender appender;
|
||||||
|
private org.apache.log4j.Logger logger;
|
||||||
|
|
||||||
|
LogCapturer(org.apache.log4j.Logger logger) {
|
||||||
|
this.logger = logger;
|
||||||
|
Appender defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("stdout");
|
||||||
|
if (defaultAppender == null) {
|
||||||
|
defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("console");
|
||||||
|
}
|
||||||
|
final Layout layout = (defaultAppender == null) ? new PatternLayout() :
|
||||||
|
defaultAppender.getLayout();
|
||||||
|
this.appender = new WriterAppender(layout, sw);
|
||||||
|
this.logger.addAppender(this.appender);
|
||||||
|
}
|
||||||
|
|
||||||
|
String getOutput() {
|
||||||
|
return sw.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stopCapturing() {
|
||||||
|
this.logger.removeAppender(this.appender);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test HMaster class will always throw ServerNotRunningYetException if checked.
|
||||||
|
*/
|
||||||
|
public static class NeverInitializedMaster extends HMaster {
|
||||||
|
public NeverInitializedMaster(Configuration conf, CoordinatedStateManager csm)
|
||||||
|
throws IOException, KeeperException, InterruptedException {
|
||||||
|
super(conf, csm);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void checkServiceStarted() throws ServerNotRunningYetException {
|
||||||
|
throw new ServerNotRunningYetException("Server is not running yet");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests region server should backoff to report for duty if master is not ready.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testReportForDutyBackoff() throws IOException, InterruptedException {
|
||||||
|
cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName());
|
||||||
|
master = cluster.addMaster();
|
||||||
|
master.start();
|
||||||
|
|
||||||
|
LogCapturer capturer = new LogCapturer(org.apache.log4j.Logger.getLogger(HRegionServer.class));
|
||||||
|
// Set sleep interval relatively low so that exponential backoff is more demanding.
|
||||||
|
int msginterval = 100;
|
||||||
|
cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval);
|
||||||
|
rs = cluster.addRegionServer();
|
||||||
|
rs.start();
|
||||||
|
|
||||||
|
int interval = 10_000;
|
||||||
|
Thread.sleep(interval);
|
||||||
|
capturer.stopCapturing();
|
||||||
|
String output = capturer.getOutput();
|
||||||
|
LOG.info(output);
|
||||||
|
String failMsg = "reportForDuty failed;";
|
||||||
|
int count = StringUtils.countMatches(output, failMsg);
|
||||||
|
|
||||||
|
// Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2).
|
||||||
|
// Ideally we can assert the exact retry count. We relax here to tolerate contention error.
|
||||||
|
int expectedRetry = (int)Math.ceil(Math.log(interval - msginterval));
|
||||||
|
assertTrue(String.format("reportForDuty retries %d times, less than expected min %d",
|
||||||
|
count, expectedRetry / 2), count > expectedRetry / 2);
|
||||||
|
assertTrue(String.format("reportForDuty retries %d times, more than expected max %d",
|
||||||
|
count, expectedRetry * 2), count < expectedRetry * 2);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests region sever reportForDuty with backup master becomes primary master after
|
* Tests region sever reportForDuty with backup master becomes primary master after
|
||||||
* the first master goes away.
|
* the first master goes away.
|
||||||
|
|
Loading…
Reference in New Issue