diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 0121781e858..565ca3bf666 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -111,7 +111,7 @@ public class RecoverableZooKeeper { // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); this.retryCounterFactory = - new RetryCounterFactory(maxRetries, retryIntervalMillis); + new RetryCounterFactory(maxRetries+1, retryIntervalMillis); if (identifier == null || identifier.length() == 0) { // the identifier = processID@hostName @@ -177,7 +177,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); isRetry = true; } } finally { @@ -211,7 +210,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); } } finally { if (traceScope != null) traceScope.close(); @@ -244,7 +242,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); } } finally { if (traceScope != null) traceScope.close(); @@ -256,7 +253,7 @@ public class RecoverableZooKeeper { LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e); if (!retryCounter.shouldRetry()) { LOG.error("ZooKeeper " + opName + " failed after " - + retryCounter.getMaxRetries() + " retries"); + + retryCounter.getMaxAttempts() + " attempts"); throw e; } } @@ -287,7 +284,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); } } finally { if (traceScope != null) traceScope.close(); @@ -320,7 +316,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); } } finally { if (traceScope != null) traceScope.close(); @@ -354,7 +349,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); } } finally { if (traceScope != null) traceScope.close(); @@ -388,7 +382,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); } } finally { if (traceScope != null) traceScope.close(); @@ -440,7 +433,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); isRetry = true; } } finally { @@ -528,7 +520,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); isRetry = true; } } @@ -563,7 +554,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); } } /** @@ -620,7 +610,6 @@ public class RecoverableZooKeeper { } } retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); } } finally { if (traceScope != null) traceScope.close(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java index 7790362f115..ea025d1d747 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java @@ -26,44 +26,150 @@ import org.apache.hadoop.classification.InterfaceAudience; @InterfaceAudience.Private public class RetryCounter { - private static final Log LOG = LogFactory.getLog(RetryCounter.class); - private final int maxRetries; - private int retriesRemaining; - private final int retryIntervalMillis; - private final TimeUnit timeUnit; - public RetryCounter(int maxRetries, - int retryIntervalMillis, TimeUnit timeUnit) { - this.maxRetries = maxRetries; - this.retriesRemaining = maxRetries; - this.retryIntervalMillis = retryIntervalMillis; - this.timeUnit = timeUnit; - } + /** + * Configuration for a retry counter + */ + public static class RetryConfig { + private int maxAttempts; + private long sleepInterval; + private long maxSleepTime; + private TimeUnit timeUnit; + private BackoffPolicy backoffPolicy; - public int getMaxRetries() { - return maxRetries; + private static final BackoffPolicy DEFAULT_BACKOFF_POLICY = new ExponentialBackoffPolicy(); + + public RetryConfig() { + maxAttempts = 1; + sleepInterval = 1000; + maxSleepTime = -1; + timeUnit = TimeUnit.MILLISECONDS; + backoffPolicy = DEFAULT_BACKOFF_POLICY; + } + + public RetryConfig(int maxAttempts, long sleepInterval, long maxSleepTime, + TimeUnit timeUnit, BackoffPolicy backoffPolicy) { + this.maxAttempts = maxAttempts; + this.sleepInterval = sleepInterval; + this.maxSleepTime = maxSleepTime; + this.timeUnit = timeUnit; + this.backoffPolicy = backoffPolicy; + } + + public RetryConfig setBackoffPolicy(BackoffPolicy backoffPolicy) { + this.backoffPolicy = backoffPolicy; + return this; + } + + public RetryConfig setMaxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public RetryConfig setMaxSleepTime(long maxSleepTime) { + this.maxSleepTime = maxSleepTime; + return this; + } + + public RetryConfig setSleepInterval(long sleepInterval) { + this.sleepInterval = sleepInterval; + return this; + } + + public RetryConfig setTimeUnit(TimeUnit timeUnit) { + this.timeUnit = timeUnit; + return this; + } + + public int getMaxAttempts() { + return maxAttempts; + } + + public long getMaxSleepTime() { + return maxSleepTime; + } + + public long getSleepInterval() { + return sleepInterval; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + public BackoffPolicy getBackoffPolicy() { + return backoffPolicy; + } } /** - * Sleep for a exponentially back off time + * Policy for calculating sleeping intervals between retry attempts + */ + public static class BackoffPolicy { + public long getBackoffTime(RetryConfig config, int attempts) { + return config.getSleepInterval(); + } + } + + public static class ExponentialBackoffPolicy extends BackoffPolicy { + @Override + public long getBackoffTime(RetryConfig config, int attempts) { + long backoffTime = (long) (config.getSleepInterval() * Math.pow(2, attempts)); + return backoffTime; + } + } + + public static class ExponentialBackoffPolicyWithLimit extends ExponentialBackoffPolicy { + @Override + public long getBackoffTime(RetryConfig config, int attempts) { + long backoffTime = super.getBackoffTime(config, attempts); + return config.getMaxSleepTime() > 0 ? Math.min(backoffTime, config.getMaxSleepTime()) : backoffTime; + } + } + + private static final Log LOG = LogFactory.getLog(RetryCounter.class); + + private RetryConfig retryConfig; + private int attempts; + + public RetryCounter(int maxAttempts, long sleepInterval, TimeUnit timeUnit) { + this(new RetryConfig(maxAttempts, sleepInterval, -1, timeUnit, new ExponentialBackoffPolicy())); + } + + public RetryCounter(RetryConfig retryConfig) { + this.attempts = 0; + this.retryConfig = retryConfig; + } + + public int getMaxAttempts() { + return retryConfig.getMaxAttempts(); + } + + /** + * Sleep for a back off time as supplied by the backoff policy, and increases the attempts * @throws InterruptedException */ public void sleepUntilNextRetry() throws InterruptedException { int attempts = getAttemptTimes(); - long sleepTime = (long) (retryIntervalMillis * Math.pow(2, attempts)); + long sleepTime = retryConfig.backoffPolicy.getBackoffTime(retryConfig, attempts); LOG.info("Sleeping " + sleepTime + "ms before retry #" + attempts + "..."); - timeUnit.sleep(sleepTime); + retryConfig.getTimeUnit().sleep(sleepTime); + useRetry(); } public boolean shouldRetry() { - return retriesRemaining > 0; + return attempts < retryConfig.getMaxAttempts(); } public void useRetry() { - retriesRemaining--; + attempts++; } - + + public boolean isRetry() { + return attempts > 0; + } + public int getAttemptTimes() { - return maxRetries-retriesRemaining+1; + return attempts; } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java index 59edf968bb0..9beb14eac6c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java @@ -21,20 +21,27 @@ package org.apache.hadoop.hbase.util; import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicy; +import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; @InterfaceAudience.Private public class RetryCounterFactory { - private final int maxRetries; - private final int retryIntervalMillis; + private final RetryConfig retryConfig; - public RetryCounterFactory(int maxRetries, int retryIntervalMillis) { - this.maxRetries = maxRetries; - this.retryIntervalMillis = retryIntervalMillis; + public RetryCounterFactory(int maxAttempts, int sleepIntervalMillis) { + this(new RetryConfig( + maxAttempts, + sleepIntervalMillis, + -1, + TimeUnit.MILLISECONDS, + new ExponentialBackoffPolicy())); + } + + public RetryCounterFactory(RetryConfig retryConfig) { + this.retryConfig = retryConfig; } public RetryCounter create() { - return new RetryCounter( - maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS - ); + return new RetryCounter(retryConfig); } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java index b744ae0e3c2..a96a4b11653 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java @@ -27,6 +27,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; +import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.util.Shell; /** @@ -48,6 +51,14 @@ public class HBaseClusterManager extends ClusterManager { private static final String DEFAULT_TUNNEL_CMD = "/usr/bin/ssh %1$s %2$s%3$s%4$s \"%5$s\""; private String tunnelCmd; + private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; + private static final int DEFAULT_RETRY_ATTEMPTS = 5; + + private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; + private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; + + protected RetryCounterFactory retryCounterFactory; + @Override public void setConf(Configuration conf) { super.setConf(conf); @@ -68,6 +79,10 @@ public class HBaseClusterManager extends ClusterManager { (sshOptions != null && sshOptions.length() > 0)) { LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]"); } + + this.retryCounterFactory = new RetryCounterFactory(new RetryConfig() + .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) + .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); } /** @@ -184,7 +199,15 @@ public class HBaseClusterManager extends ClusterManager { LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname); RemoteShell shell = new RemoteShell(hostname, cmd); - shell.execute(); + try { + shell.execute(); + } catch (Shell.ExitCodeException ex) { + // capture the stdout of the process as well. + String output = shell.getOutput(); + // add output for the ExitCodeException. + throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage() + + ", stdout: " + output); + } LOG.info("Executed remote command, exit code:" + shell.getExitCode() + " , output:" + shell.getOutput()); @@ -192,8 +215,37 @@ public class HBaseClusterManager extends ClusterManager { return new Pair(shell.getExitCode(), shell.getOutput()); } + private Pair execWithRetries(String hostname, String... cmd) + throws IOException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return exec(hostname, cmd); + } catch (IOException e) { + retryOrThrow(retryCounter, e, hostname, cmd); + } + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException ex) { + // ignore + LOG.warn("Sleep Interrupted:" + ex); + } + } + } + + private void retryOrThrow(RetryCounter retryCounter, E ex, + String hostname, String[] cmd) throws E { + if (retryCounter.shouldRetry()) { + LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname + + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: " + + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage()); + return; + } + throw ex; + } + private void exec(String hostname, ServiceType service, Operation op) throws IOException { - exec(hostname, getCommandProvider(service).getCommand(service, op)); + execWithRetries(hostname, getCommandProvider(service).getCommand(service, op)); } @Override @@ -213,12 +265,12 @@ public class HBaseClusterManager extends ClusterManager { @Override public void signal(ServiceType service, String signal, String hostname) throws IOException { - exec(hostname, getCommandProvider(service).signalCommand(service, signal)); + execWithRetries(hostname, getCommandProvider(service).signalCommand(service, signal)); } @Override public boolean isRunning(ServiceType service, String hostname) throws IOException { - String ret = exec(hostname, getCommandProvider(service).isRunningCommand(service)) + String ret = execWithRetries(hostname, getCommandProvider(service).isRunningCommand(service)) .getSecond(); return ret.length() > 0; } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRegionsOfTableAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRegionsOfTableAction.java index 43e7eed9fc8..e980d68a90e 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRegionsOfTableAction.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRegionsOfTableAction.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.chaos.actions; import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.commons.lang.math.RandomUtils; @@ -34,6 +35,7 @@ public class MoveRegionsOfTableAction extends Action { private final long sleepTime; private final byte[] tableNameBytes; private final String tableName; + private final long maxTime; public MoveRegionsOfTableAction(String tableName) { this(-1, tableName); @@ -43,6 +45,7 @@ public class MoveRegionsOfTableAction extends Action { this.sleepTime = sleepTime; this.tableNameBytes = Bytes.toBytes(tableName); this.tableName = tableName; + this.maxTime = 10 * 60 * 1000; // 10 min default } @Override @@ -62,6 +65,9 @@ public class MoveRegionsOfTableAction extends Action { return; } + Collections.shuffle(regions); + + long start = System.currentTimeMillis(); for (HRegionInfo regionInfo:regions) { try { String destServerName = @@ -74,6 +80,12 @@ public class MoveRegionsOfTableAction extends Action { if (sleepTime > 0) { Thread.sleep(sleepTime); } + + // put a limit on max num regions. Otherwise, this won't finish + // with a sleep time of 10sec, 100 regions will finish in 16min + if (System.currentTimeMillis() - start > maxTime) { + break; + } } } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java index e2671103734..f42f903473e 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java @@ -145,6 +145,7 @@ public class PolicyBasedChaosMonkey extends ChaosMonkey { return; } for (Thread monkeyThread : monkeyThreads) { + // TODO: bound the wait time per policy monkeyThread.join(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 5f59f7f7117..d692adc6348 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -81,7 +80,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.security.User; @@ -186,7 +184,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { { new Boolean(false) }, { new Boolean(true) } }); - + /** This is for unit tests parameterized with a single boolean. */ public static final List MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination() ; /** Compression algorithms to use in testing */ @@ -1486,13 +1484,13 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { // ========================================================================== // Canned table and table descriptor creation // TODO replace HBaseTestCase - + public final static byte [] fam1 = Bytes.toBytes("colfamily11"); public final static byte [] fam2 = Bytes.toBytes("colfamily21"); public final static byte [] fam3 = Bytes.toBytes("colfamily31"); public static final byte[][] COLUMNS = {fam1, fam2, fam3}; private static final int MAXVERSIONS = 3; - + public static final char FIRST_CHAR = 'a'; public static final char LAST_CHAR = 'z'; public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; @@ -1569,7 +1567,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, hlog); } - + /** * @param tableName * @param startKey @@ -2034,7 +2032,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - RetryCounter retrier = new RetryCounter(numRetries, (int)pause, TimeUnit.MICROSECONDS); + RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS); while(retrier.shouldRetry()) { int index = getMiniHBaseCluster().getServerWith(firstrow); if (index != -1) { @@ -2159,12 +2157,12 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { /** * Create a stubbed out RegionServerService, mainly for getting FS. */ - public RegionServerServices createMockRegionServerService() throws IOException { + public RegionServerServices createMockRegionServerService() throws IOException { return createMockRegionServerService((ServerName)null); } /** - * Create a stubbed out RegionServerService, mainly for getting FS. + * Create a stubbed out RegionServerService, mainly for getting FS. * This version is used by TestTokenAuthentication */ public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws IOException { @@ -2175,7 +2173,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** - * Create a stubbed out RegionServerService, mainly for getting FS. + * Create a stubbed out RegionServerService, mainly for getting FS. * This version is used by TestOpenRegionHandler */ public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {