HBASE-15615 Wrong sleep time when RegionServerCallable need retry (Guanghao Zhang)
This commit is contained in:
parent
77f511fceb
commit
0042d6d4c8
|
@ -54,6 +54,9 @@ public class ConnectionUtils {
|
||||||
if (ntries >= HConstants.RETRY_BACKOFF.length) {
|
if (ntries >= HConstants.RETRY_BACKOFF.length) {
|
||||||
ntries = HConstants.RETRY_BACKOFF.length - 1;
|
ntries = HConstants.RETRY_BACKOFF.length - 1;
|
||||||
}
|
}
|
||||||
|
if (ntries < 0) {
|
||||||
|
ntries = 0;
|
||||||
|
}
|
||||||
|
|
||||||
long normalPause = pause * HConstants.RETRY_BACKOFF[ntries];
|
long normalPause = pause * HConstants.RETRY_BACKOFF[ntries];
|
||||||
long jitter = (long)(normalPause * RANDOM.nextFloat() * 0.01f); // 1% possible jitter
|
long jitter = (long)(normalPause * RANDOM.nextFloat() * 0.01f); // 1% possible jitter
|
||||||
|
|
|
@ -138,7 +138,7 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long sleep(long pause, int tries) {
|
public long sleep(long pause, int tries) {
|
||||||
long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
|
long sleep = ConnectionUtils.getPauseTime(pause, tries);
|
||||||
if (sleep < MIN_WAIT_DEAD_SERVER
|
if (sleep < MIN_WAIT_DEAD_SERVER
|
||||||
&& (location == null || connection.isDeadServer(location.getServerName()))) {
|
&& (location == null || connection.isDeadServer(location.getServerName()))) {
|
||||||
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
|
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
|
||||||
|
|
|
@ -134,8 +134,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long sleep(long pause, int tries) {
|
public long sleep(long pause, int tries) {
|
||||||
// Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
|
long sleep = ConnectionUtils.getPauseTime(pause, tries);
|
||||||
long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
|
|
||||||
if (sleep < MIN_WAIT_DEAD_SERVER
|
if (sleep < MIN_WAIT_DEAD_SERVER
|
||||||
&& (location == null || getConnection().isDeadServer(location.getServerName()))) {
|
&& (location == null || getConnection().isDeadServer(location.getServerName()))) {
|
||||||
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
|
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
|
||||||
|
|
|
@ -159,8 +159,8 @@ public class RpcRetryingCaller<T> {
|
||||||
}
|
}
|
||||||
// If the server is dead, we need to wait a little before retrying, to give
|
// If the server is dead, we need to wait a little before retrying, to give
|
||||||
// a chance to the regions to be
|
// a chance to the regions to be
|
||||||
// tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
|
// get right pause time, start by RETRY_BACKOFF[0] * pause
|
||||||
expectedSleep = callable.sleep(pause, tries + 1);
|
expectedSleep = callable.sleep(pause, tries);
|
||||||
|
|
||||||
// If, after the planned sleep, there won't be enough time left, we stop now.
|
// If, after the planned sleep, there won't be enough time left, we stop now.
|
||||||
long duration = singleCallDuration(expectedSleep);
|
long duration = singleCallDuration(expectedSleep);
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -53,4 +54,23 @@ public class TestConnectionUtils {
|
||||||
assertTrue(retyTimeSet.size() > (retries.length * 0.80));
|
assertTrue(retyTimeSet.size() > (retries.length * 0.80));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPauseTime() {
|
||||||
|
long pauseTime;
|
||||||
|
long baseTime = 100;
|
||||||
|
pauseTime = ConnectionUtils.getPauseTime(baseTime, -1);
|
||||||
|
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[0]));
|
||||||
|
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[0] * 1.01f));
|
||||||
|
|
||||||
|
for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
|
||||||
|
pauseTime = ConnectionUtils.getPauseTime(baseTime, i);
|
||||||
|
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
|
||||||
|
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
|
||||||
|
}
|
||||||
|
|
||||||
|
int length = HConstants.RETRY_BACKOFF.length;
|
||||||
|
pauseTime = ConnectionUtils.getPauseTime(baseTime, length);
|
||||||
|
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[length - 1]));
|
||||||
|
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[length - 1] * 1.01f));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
|
@ -120,14 +121,25 @@ public class TestHCM {
|
||||||
*/
|
*/
|
||||||
public static class SleepAndFailFirstTime extends BaseRegionObserver {
|
public static class SleepAndFailFirstTime extends BaseRegionObserver {
|
||||||
static final AtomicLong ct = new AtomicLong(0);
|
static final AtomicLong ct = new AtomicLong(0);
|
||||||
|
static final String SLEEP_TIME_CONF_KEY =
|
||||||
|
"hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
|
||||||
|
static final long DEFAULT_SLEEP_TIME = 20000;
|
||||||
|
static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
|
||||||
|
|
||||||
public SleepAndFailFirstTime() {
|
public SleepAndFailFirstTime() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
|
||||||
|
RegionCoprocessorEnvironment env = c.getEnvironment();
|
||||||
|
Configuration conf = env.getConfiguration();
|
||||||
|
sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
|
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
final Get get, final List<Cell> results) throws IOException {
|
final Get get, final List<Cell> results) throws IOException {
|
||||||
Threads.sleep(20000);
|
Threads.sleep(sleepTime.get());
|
||||||
if (ct.incrementAndGet() == 1){
|
if (ct.incrementAndGet() == 1){
|
||||||
throw new IOException("first call I fail");
|
throw new IOException("first call I fail");
|
||||||
}
|
}
|
||||||
|
@ -357,6 +369,96 @@ public class TestHCM {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test starting from 0 index when RpcRetryingCaller calculate the backoff time.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRpcRetryingCallerSleep() throws Exception {
|
||||||
|
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcRetryingCallerSleep");
|
||||||
|
hdt.addCoprocessorWithSpec("|" + SleepAndFailFirstTime.class.getName() + "||"
|
||||||
|
+ SleepAndFailFirstTime.SLEEP_TIME_CONF_KEY + "=2000");
|
||||||
|
TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
|
||||||
|
|
||||||
|
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
|
||||||
|
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
|
||||||
|
|
||||||
|
Connection connection = ConnectionFactory.createConnection(c);
|
||||||
|
Table t = connection.getTable(TableName.valueOf("HCM-testRpcRetryingCallerSleep"));
|
||||||
|
if (t instanceof HTable) {
|
||||||
|
HTable table = (HTable) t;
|
||||||
|
table.setOperationTimeout(8000);
|
||||||
|
// Check that it works. Because 2s + 3s * RETRY_BACKOFF[0] + 2s < 8s
|
||||||
|
table.get(new Get(FAM_NAM));
|
||||||
|
|
||||||
|
// Resetting and retrying.
|
||||||
|
SleepAndFailFirstTime.ct.set(0);
|
||||||
|
try {
|
||||||
|
table.setOperationTimeout(6000);
|
||||||
|
// Will fail this time. After sleep, there are not enough time for second retry
|
||||||
|
// Beacuse 2s + 3s + 2s > 6s
|
||||||
|
table.get(new Get(FAM_NAM));
|
||||||
|
Assert.fail("We expect an exception here");
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
LOG.info("We received an exception, as expected ", e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Wrong exception:" + e.getMessage());
|
||||||
|
} finally {
|
||||||
|
table.close();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCallableSleep() throws Exception {
|
||||||
|
long pauseTime;
|
||||||
|
long baseTime = 100;
|
||||||
|
TableName tableName = TableName.valueOf("HCM-testCallableSleep");
|
||||||
|
HTable table = TEST_UTIL.createTable(tableName, FAM_NAM);
|
||||||
|
RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
|
||||||
|
TEST_UTIL.getConnection(), tableName, ROW) {
|
||||||
|
public Object call(int timeout) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
regionServerCallable.prepare(false);
|
||||||
|
for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
|
||||||
|
pauseTime = regionServerCallable.sleep(baseTime, i);
|
||||||
|
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
|
||||||
|
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
|
||||||
|
}
|
||||||
|
|
||||||
|
RegionAdminServiceCallable<Object> regionAdminServiceCallable =
|
||||||
|
new RegionAdminServiceCallable<Object>(
|
||||||
|
(ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
|
||||||
|
TEST_UTIL.getConfiguration()), tableName, ROW) {
|
||||||
|
public Object call(int timeout) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
regionAdminServiceCallable.prepare(false);
|
||||||
|
for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
|
||||||
|
pauseTime = regionAdminServiceCallable.sleep(baseTime, i);
|
||||||
|
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
|
||||||
|
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
|
||||||
|
}
|
||||||
|
|
||||||
|
MasterCallable masterCallable = new MasterCallable((HConnection) TEST_UTIL.getConnection()) {
|
||||||
|
public Object call(int timeout) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
|
||||||
|
pauseTime = masterCallable.sleep(baseTime, i);
|
||||||
|
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
|
||||||
|
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void testConnectionClose(boolean allowsInterrupt) throws Exception {
|
private void testConnectionClose(boolean allowsInterrupt) throws Exception {
|
||||||
TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
|
TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
|
||||||
TEST_UTIL.createTable(tableName, FAM_NAM).close();
|
TEST_UTIL.createTable(tableName, FAM_NAM).close();
|
||||||
|
@ -1139,7 +1241,7 @@ public class TestHCM {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test
|
@Test
|
||||||
public void testErrorBackoffTimeCalculation() throws Exception {
|
public void testErrorBackoffTimeCalculation() throws Exception {
|
||||||
// TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
|
// TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
|
||||||
final long ANY_PAUSE = 100;
|
final long ANY_PAUSE = 100;
|
||||||
|
@ -1159,40 +1261,23 @@ public class TestHCM {
|
||||||
|
|
||||||
// Check some backoff values from HConstants sequence.
|
// Check some backoff values from HConstants sequence.
|
||||||
tracker.reportServerError(location);
|
tracker.reportServerError(location);
|
||||||
assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
|
||||||
|
tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||||
tracker.reportServerError(location);
|
tracker.reportServerError(location);
|
||||||
tracker.reportServerError(location);
|
tracker.reportServerError(location);
|
||||||
tracker.reportServerError(location);
|
tracker.reportServerError(location);
|
||||||
assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
|
||||||
|
tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||||
|
|
||||||
// All of this shouldn't affect backoff for different location.
|
// All of this shouldn't affect backoff for different location.
|
||||||
assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
|
assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
|
||||||
tracker.reportServerError(diffLocation);
|
tracker.reportServerError(diffLocation);
|
||||||
assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
|
assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
|
||||||
|
tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
|
||||||
|
|
||||||
// Check with different base.
|
// Check with different base.
|
||||||
assertEqualsWithJitter(ANY_PAUSE * 10,
|
assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
|
||||||
tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
|
tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
|
||||||
|
|
||||||
// See that time from last error is taken into account. Time shift is applied after jitter,
|
|
||||||
// so pass the original expected backoff as the base for jitter.
|
|
||||||
long timeShift = (long)(ANY_PAUSE * 0.5);
|
|
||||||
timeMachine.setValue(timeBase + timeShift);
|
|
||||||
assertEqualsWithJitter((ANY_PAUSE * 5) - timeShift,
|
|
||||||
tracker.calculateBackoffTime(location, ANY_PAUSE), ANY_PAUSE * 2);
|
|
||||||
|
|
||||||
// However we should not go into negative.
|
|
||||||
timeMachine.setValue(timeBase + ANY_PAUSE * 100);
|
|
||||||
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
|
||||||
|
|
||||||
// We also should not go over the boundary; last retry would be on it.
|
|
||||||
long timeLeft = (long)(ANY_PAUSE * 0.5);
|
|
||||||
timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
|
|
||||||
assertTrue(tracker.canRetryMore(1));
|
|
||||||
tracker.reportServerError(location);
|
|
||||||
assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
|
||||||
timeMachine.setValue(timeBase + largeAmountOfTime);
|
|
||||||
assertFalse(tracker.canRetryMore(1));
|
|
||||||
} finally {
|
} finally {
|
||||||
EnvironmentEdgeManager.reset();
|
EnvironmentEdgeManager.reset();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue