HBASE-9750 Add retries around Action server stop/start
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1536908 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ff23790dd3
commit
b73e8b3b5a
|
@ -111,7 +111,7 @@ public class RecoverableZooKeeper {
|
|||
// TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
|
||||
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
|
||||
this.retryCounterFactory =
|
||||
new RetryCounterFactory(maxRetries, retryIntervalMillis);
|
||||
new RetryCounterFactory(maxRetries+1, retryIntervalMillis);
|
||||
|
||||
if (identifier == null || identifier.length() == 0) {
|
||||
// the identifier = processID@hostName
|
||||
|
@ -177,7 +177,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
isRetry = true;
|
||||
}
|
||||
} finally {
|
||||
|
@ -211,7 +210,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
}
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
|
@ -244,7 +242,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
}
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
|
@ -256,7 +253,7 @@ public class RecoverableZooKeeper {
|
|||
LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
|
||||
if (!retryCounter.shouldRetry()) {
|
||||
LOG.error("ZooKeeper " + opName + " failed after "
|
||||
+ retryCounter.getMaxRetries() + " retries");
|
||||
+ retryCounter.getMaxAttempts() + " attempts");
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -287,7 +284,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
}
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
|
@ -320,7 +316,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
}
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
|
@ -354,7 +349,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
}
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
|
@ -388,7 +382,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
}
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
|
@ -440,7 +433,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
isRetry = true;
|
||||
}
|
||||
} finally {
|
||||
|
@ -528,7 +520,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
isRetry = true;
|
||||
}
|
||||
}
|
||||
|
@ -563,7 +554,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
}
|
||||
}
|
||||
/**
|
||||
|
@ -620,7 +610,6 @@ public class RecoverableZooKeeper {
|
|||
}
|
||||
}
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
retryCounter.useRetry();
|
||||
}
|
||||
} finally {
|
||||
if (traceScope != null) traceScope.close();
|
||||
|
|
|
@ -26,44 +26,150 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
|
||||
@InterfaceAudience.Private
|
||||
public class RetryCounter {
|
||||
private static final Log LOG = LogFactory.getLog(RetryCounter.class);
|
||||
private final int maxRetries;
|
||||
private int retriesRemaining;
|
||||
private final int retryIntervalMillis;
|
||||
private final TimeUnit timeUnit;
|
||||
|
||||
public RetryCounter(int maxRetries,
|
||||
int retryIntervalMillis, TimeUnit timeUnit) {
|
||||
this.maxRetries = maxRetries;
|
||||
this.retriesRemaining = maxRetries;
|
||||
this.retryIntervalMillis = retryIntervalMillis;
|
||||
this.timeUnit = timeUnit;
|
||||
/**
|
||||
* Configuration for a retry counter
|
||||
*/
|
||||
public static class RetryConfig {
|
||||
private int maxAttempts;
|
||||
private long sleepInterval;
|
||||
private long maxSleepTime;
|
||||
private TimeUnit timeUnit;
|
||||
private BackoffPolicy backoffPolicy;
|
||||
|
||||
private static final BackoffPolicy DEFAULT_BACKOFF_POLICY = new ExponentialBackoffPolicy();
|
||||
|
||||
public RetryConfig() {
|
||||
maxAttempts = 1;
|
||||
sleepInterval = 1000;
|
||||
maxSleepTime = -1;
|
||||
timeUnit = TimeUnit.MILLISECONDS;
|
||||
backoffPolicy = DEFAULT_BACKOFF_POLICY;
|
||||
}
|
||||
|
||||
public int getMaxRetries() {
|
||||
return maxRetries;
|
||||
public RetryConfig(int maxAttempts, long sleepInterval, long maxSleepTime,
|
||||
TimeUnit timeUnit, BackoffPolicy backoffPolicy) {
|
||||
this.maxAttempts = maxAttempts;
|
||||
this.sleepInterval = sleepInterval;
|
||||
this.maxSleepTime = maxSleepTime;
|
||||
this.timeUnit = timeUnit;
|
||||
this.backoffPolicy = backoffPolicy;
|
||||
}
|
||||
|
||||
public RetryConfig setBackoffPolicy(BackoffPolicy backoffPolicy) {
|
||||
this.backoffPolicy = backoffPolicy;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RetryConfig setMaxAttempts(int maxAttempts) {
|
||||
this.maxAttempts = maxAttempts;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RetryConfig setMaxSleepTime(long maxSleepTime) {
|
||||
this.maxSleepTime = maxSleepTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RetryConfig setSleepInterval(long sleepInterval) {
|
||||
this.sleepInterval = sleepInterval;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RetryConfig setTimeUnit(TimeUnit timeUnit) {
|
||||
this.timeUnit = timeUnit;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMaxAttempts() {
|
||||
return maxAttempts;
|
||||
}
|
||||
|
||||
public long getMaxSleepTime() {
|
||||
return maxSleepTime;
|
||||
}
|
||||
|
||||
public long getSleepInterval() {
|
||||
return sleepInterval;
|
||||
}
|
||||
|
||||
public TimeUnit getTimeUnit() {
|
||||
return timeUnit;
|
||||
}
|
||||
|
||||
public BackoffPolicy getBackoffPolicy() {
|
||||
return backoffPolicy;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep for a exponentially back off time
|
||||
* Policy for calculating sleeping intervals between retry attempts
|
||||
*/
|
||||
public static class BackoffPolicy {
|
||||
public long getBackoffTime(RetryConfig config, int attempts) {
|
||||
return config.getSleepInterval();
|
||||
}
|
||||
}
|
||||
|
||||
public static class ExponentialBackoffPolicy extends BackoffPolicy {
|
||||
@Override
|
||||
public long getBackoffTime(RetryConfig config, int attempts) {
|
||||
long backoffTime = (long) (config.getSleepInterval() * Math.pow(2, attempts));
|
||||
return backoffTime;
|
||||
}
|
||||
}
|
||||
|
||||
public static class ExponentialBackoffPolicyWithLimit extends ExponentialBackoffPolicy {
|
||||
@Override
|
||||
public long getBackoffTime(RetryConfig config, int attempts) {
|
||||
long backoffTime = super.getBackoffTime(config, attempts);
|
||||
return config.getMaxSleepTime() > 0 ? Math.min(backoffTime, config.getMaxSleepTime()) : backoffTime;
|
||||
}
|
||||
}
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RetryCounter.class);
|
||||
|
||||
private RetryConfig retryConfig;
|
||||
private int attempts;
|
||||
|
||||
public RetryCounter(int maxAttempts, long sleepInterval, TimeUnit timeUnit) {
|
||||
this(new RetryConfig(maxAttempts, sleepInterval, -1, timeUnit, new ExponentialBackoffPolicy()));
|
||||
}
|
||||
|
||||
public RetryCounter(RetryConfig retryConfig) {
|
||||
this.attempts = 0;
|
||||
this.retryConfig = retryConfig;
|
||||
}
|
||||
|
||||
public int getMaxAttempts() {
|
||||
return retryConfig.getMaxAttempts();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep for a back off time as supplied by the backoff policy, and increases the attempts
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void sleepUntilNextRetry() throws InterruptedException {
|
||||
int attempts = getAttemptTimes();
|
||||
long sleepTime = (long) (retryIntervalMillis * Math.pow(2, attempts));
|
||||
long sleepTime = retryConfig.backoffPolicy.getBackoffTime(retryConfig, attempts);
|
||||
LOG.info("Sleeping " + sleepTime + "ms before retry #" + attempts + "...");
|
||||
timeUnit.sleep(sleepTime);
|
||||
retryConfig.getTimeUnit().sleep(sleepTime);
|
||||
useRetry();
|
||||
}
|
||||
|
||||
public boolean shouldRetry() {
|
||||
return retriesRemaining > 0;
|
||||
return attempts < retryConfig.getMaxAttempts();
|
||||
}
|
||||
|
||||
public void useRetry() {
|
||||
retriesRemaining--;
|
||||
attempts++;
|
||||
}
|
||||
|
||||
public boolean isRetry() {
|
||||
return attempts > 0;
|
||||
}
|
||||
|
||||
public int getAttemptTimes() {
|
||||
return maxRetries-retriesRemaining+1;
|
||||
return attempts;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,20 +21,27 @@ package org.apache.hadoop.hbase.util;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicy;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class RetryCounterFactory {
|
||||
private final int maxRetries;
|
||||
private final int retryIntervalMillis;
|
||||
private final RetryConfig retryConfig;
|
||||
|
||||
public RetryCounterFactory(int maxRetries, int retryIntervalMillis) {
|
||||
this.maxRetries = maxRetries;
|
||||
this.retryIntervalMillis = retryIntervalMillis;
|
||||
public RetryCounterFactory(int maxAttempts, int sleepIntervalMillis) {
|
||||
this(new RetryConfig(
|
||||
maxAttempts,
|
||||
sleepIntervalMillis,
|
||||
-1,
|
||||
TimeUnit.MILLISECONDS,
|
||||
new ExponentialBackoffPolicy()));
|
||||
}
|
||||
|
||||
public RetryCounterFactory(RetryConfig retryConfig) {
|
||||
this.retryConfig = retryConfig;
|
||||
}
|
||||
|
||||
public RetryCounter create() {
|
||||
return new RetryCounter(
|
||||
maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS
|
||||
);
|
||||
return new RetryCounter(retryConfig);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
|
||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
|
||||
/**
|
||||
|
@ -48,6 +51,14 @@ public class HBaseClusterManager extends ClusterManager {
|
|||
private static final String DEFAULT_TUNNEL_CMD = "/usr/bin/ssh %1$s %2$s%3$s%4$s \"%5$s\"";
|
||||
private String tunnelCmd;
|
||||
|
||||
private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
|
||||
private static final int DEFAULT_RETRY_ATTEMPTS = 5;
|
||||
|
||||
private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
|
||||
private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
|
||||
|
||||
protected RetryCounterFactory retryCounterFactory;
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
|
@ -68,6 +79,10 @@ public class HBaseClusterManager extends ClusterManager {
|
|||
(sshOptions != null && sshOptions.length() > 0)) {
|
||||
LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
|
||||
}
|
||||
|
||||
this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
|
||||
.setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
|
||||
.setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -184,7 +199,15 @@ public class HBaseClusterManager extends ClusterManager {
|
|||
LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
|
||||
|
||||
RemoteShell shell = new RemoteShell(hostname, cmd);
|
||||
try {
|
||||
shell.execute();
|
||||
} catch (Shell.ExitCodeException ex) {
|
||||
// capture the stdout of the process as well.
|
||||
String output = shell.getOutput();
|
||||
// add output for the ExitCodeException.
|
||||
throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
|
||||
+ ", stdout: " + output);
|
||||
}
|
||||
|
||||
LOG.info("Executed remote command, exit code:" + shell.getExitCode()
|
||||
+ " , output:" + shell.getOutput());
|
||||
|
@ -192,8 +215,37 @@ public class HBaseClusterManager extends ClusterManager {
|
|||
return new Pair<Integer, String>(shell.getExitCode(), shell.getOutput());
|
||||
}
|
||||
|
||||
private Pair<Integer, String> execWithRetries(String hostname, String... cmd)
|
||||
throws IOException {
|
||||
RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
return exec(hostname, cmd);
|
||||
} catch (IOException e) {
|
||||
retryOrThrow(retryCounter, e, hostname, cmd);
|
||||
}
|
||||
try {
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
} catch (InterruptedException ex) {
|
||||
// ignore
|
||||
LOG.warn("Sleep Interrupted:" + ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
|
||||
String hostname, String[] cmd) throws E {
|
||||
if (retryCounter.shouldRetry()) {
|
||||
LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
|
||||
+ " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
|
||||
+ retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
|
||||
return;
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
|
||||
private void exec(String hostname, ServiceType service, Operation op) throws IOException {
|
||||
exec(hostname, getCommandProvider(service).getCommand(service, op));
|
||||
execWithRetries(hostname, getCommandProvider(service).getCommand(service, op));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -213,12 +265,12 @@ public class HBaseClusterManager extends ClusterManager {
|
|||
|
||||
@Override
|
||||
public void signal(ServiceType service, String signal, String hostname) throws IOException {
|
||||
exec(hostname, getCommandProvider(service).signalCommand(service, signal));
|
||||
execWithRetries(hostname, getCommandProvider(service).signalCommand(service, signal));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning(ServiceType service, String hostname) throws IOException {
|
||||
String ret = exec(hostname, getCommandProvider(service).isRunningCommand(service))
|
||||
String ret = execWithRetries(hostname, getCommandProvider(service).isRunningCommand(service))
|
||||
.getSecond();
|
||||
return ret.length() > 0;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.chaos.actions;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.math.RandomUtils;
|
||||
|
@ -34,6 +35,7 @@ public class MoveRegionsOfTableAction extends Action {
|
|||
private final long sleepTime;
|
||||
private final byte[] tableNameBytes;
|
||||
private final String tableName;
|
||||
private final long maxTime;
|
||||
|
||||
public MoveRegionsOfTableAction(String tableName) {
|
||||
this(-1, tableName);
|
||||
|
@ -43,6 +45,7 @@ public class MoveRegionsOfTableAction extends Action {
|
|||
this.sleepTime = sleepTime;
|
||||
this.tableNameBytes = Bytes.toBytes(tableName);
|
||||
this.tableName = tableName;
|
||||
this.maxTime = 10 * 60 * 1000; // 10 min default
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -62,6 +65,9 @@ public class MoveRegionsOfTableAction extends Action {
|
|||
return;
|
||||
}
|
||||
|
||||
Collections.shuffle(regions);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
for (HRegionInfo regionInfo:regions) {
|
||||
try {
|
||||
String destServerName =
|
||||
|
@ -74,6 +80,12 @@ public class MoveRegionsOfTableAction extends Action {
|
|||
if (sleepTime > 0) {
|
||||
Thread.sleep(sleepTime);
|
||||
}
|
||||
|
||||
// put a limit on max num regions. Otherwise, this won't finish
|
||||
// with a sleep time of 10sec, 100 regions will finish in 16min
|
||||
if (System.currentTimeMillis() - start > maxTime) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -145,6 +145,7 @@ public class PolicyBasedChaosMonkey extends ChaosMonkey {
|
|||
return;
|
||||
}
|
||||
for (Thread monkeyThread : monkeyThreads) {
|
||||
// TODO: bound the wait time per policy
|
||||
monkeyThread.join();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
|
@ -81,7 +80,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
@ -2034,7 +2032,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||
RetryCounter retrier = new RetryCounter(numRetries, (int)pause, TimeUnit.MICROSECONDS);
|
||||
RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
|
||||
while(retrier.shouldRetry()) {
|
||||
int index = getMiniHBaseCluster().getServerWith(firstrow);
|
||||
if (index != -1) {
|
||||
|
|
Loading…
Reference in New Issue