HBASE-15615 Wrong sleep time when RegionServerCallable need retry (Guanghao Zhang)
This commit is contained in:
parent
6904430a3d
commit
2482062d34
hbase-client/src/main/java/org/apache/hadoop/hbase/client
AbstractRegionServerCallable.javaConnectionUtils.javaRegionAdminServiceCallable.javaRpcRetryingCallerImpl.java
hbase-server/src/test/java/org/apache/hadoop/hbase/client
|
@ -100,8 +100,7 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
|
|||
|
||||
@Override
|
||||
public long sleep(long pause, int tries) {
|
||||
// Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
|
||||
long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
|
||||
long sleep = ConnectionUtils.getPauseTime(pause, tries);
|
||||
if (sleep < MIN_WAIT_DEAD_SERVER
|
||||
&& (location == null || getConnection().isDeadServer(location.getServerName()))) {
|
||||
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
|
||||
|
|
|
@ -55,6 +55,9 @@ public final class ConnectionUtils {
|
|||
if (ntries >= HConstants.RETRY_BACKOFF.length) {
|
||||
ntries = HConstants.RETRY_BACKOFF.length - 1;
|
||||
}
|
||||
if (ntries < 0) {
|
||||
ntries = 0;
|
||||
}
|
||||
|
||||
long normalPause = pause * HConstants.RETRY_BACKOFF[ntries];
|
||||
long jitter = (long)(normalPause * RANDOM.nextFloat() * 0.01f); // 1% possible jitter
|
||||
|
|
|
@ -140,7 +140,7 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
|
|||
|
||||
@Override
|
||||
public long sleep(long pause, int tries) {
|
||||
long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
|
||||
long sleep = ConnectionUtils.getPauseTime(pause, tries);
|
||||
if (sleep < MIN_WAIT_DEAD_SERVER
|
||||
&& (location == null || connection.isDeadServer(location.getServerName()))) {
|
||||
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
|
||||
|
|
|
@ -124,8 +124,8 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
|||
}
|
||||
// If the server is dead, we need to wait a little before retrying, to give
|
||||
// a chance to the regions to be
|
||||
// tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
|
||||
expectedSleep = callable.sleep(pause, tries + 1);
|
||||
// get right pause time, start by RETRY_BACKOFF[0] * pause
|
||||
expectedSleep = callable.sleep(pause, tries);
|
||||
|
||||
// If, after the planned sleep, there won't be enough time left, we stop now.
|
||||
long duration = singleCallDuration(expectedSleep);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Test;
|
||||
|
@ -54,4 +55,23 @@ public class TestConnectionUtils {
|
|||
assertTrue(retyTimeSet.size() > (retries.length * 0.80));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPauseTime() {
|
||||
long pauseTime;
|
||||
long baseTime = 100;
|
||||
pauseTime = ConnectionUtils.getPauseTime(baseTime, -1);
|
||||
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[0]));
|
||||
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[0] * 1.01f));
|
||||
|
||||
for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
|
||||
pauseTime = ConnectionUtils.getPauseTime(baseTime, i);
|
||||
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
|
||||
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
|
||||
}
|
||||
|
||||
int length = HConstants.RETRY_BACKOFF.length;
|
||||
pauseTime = ConnectionUtils.getPauseTime(baseTime, length);
|
||||
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[length - 1]));
|
||||
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[length - 1] * 1.01f));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
|||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
|
@ -114,14 +115,25 @@ public class TestHCM {
|
|||
*/
|
||||
public static class SleepAndFailFirstTime extends BaseRegionObserver {
|
||||
static final AtomicLong ct = new AtomicLong(0);
|
||||
static final String SLEEP_TIME_CONF_KEY =
|
||||
"hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
|
||||
static final long DEFAULT_SLEEP_TIME = 20000;
|
||||
static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
|
||||
|
||||
public SleepAndFailFirstTime() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
|
||||
RegionCoprocessorEnvironment env = c.getEnvironment();
|
||||
Configuration conf = env.getConfiguration();
|
||||
sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Get get, final List<Cell> results) throws IOException {
|
||||
Threads.sleep(20000);
|
||||
Threads.sleep(sleepTime.get());
|
||||
if (ct.incrementAndGet() == 1){
|
||||
throw new IOException("first call I fail");
|
||||
}
|
||||
|
@ -352,6 +364,95 @@ public class TestHCM {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test starting from 0 index when RpcRetryingCaller calculate the backoff time.
|
||||
*/
|
||||
@Test
|
||||
public void testRpcRetryingCallerSleep() throws Exception {
|
||||
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcRetryingCallerSleep");
|
||||
hdt.addCoprocessorWithSpec("|" + SleepAndFailFirstTime.class.getName() + "||"
|
||||
+ SleepAndFailFirstTime.SLEEP_TIME_CONF_KEY + "=2000");
|
||||
TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
|
||||
|
||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
|
||||
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
|
||||
|
||||
Connection connection = ConnectionFactory.createConnection(c);
|
||||
Table t = connection.getTable(TableName.valueOf("HCM-testRpcRetryingCallerSleep"));
|
||||
if (t instanceof HTable) {
|
||||
HTable table = (HTable) t;
|
||||
table.setOperationTimeout(8000);
|
||||
// Check that it works. Because 2s + 3s * RETRY_BACKOFF[0] + 2s < 8s
|
||||
table.get(new Get(FAM_NAM));
|
||||
|
||||
// Resetting and retrying.
|
||||
SleepAndFailFirstTime.ct.set(0);
|
||||
try {
|
||||
table.setOperationTimeout(6000);
|
||||
// Will fail this time. After sleep, there are not enough time for second retry
|
||||
// Beacuse 2s + 3s + 2s > 6s
|
||||
table.get(new Get(FAM_NAM));
|
||||
Assert.fail("We expect an exception here");
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.info("We received an exception, as expected ", e);
|
||||
} catch (IOException e) {
|
||||
Assert.fail("Wrong exception:" + e.getMessage());
|
||||
} finally {
|
||||
table.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCallableSleep() throws Exception {
|
||||
long pauseTime;
|
||||
long baseTime = 100;
|
||||
TableName tableName = TableName.valueOf("HCM-testCallableSleep");
|
||||
HTable table = TEST_UTIL.createTable(tableName, FAM_NAM);
|
||||
RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
|
||||
TEST_UTIL.getConnection(), tableName, ROW) {
|
||||
public Object call(int timeout) throws IOException {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
regionServerCallable.prepare(false);
|
||||
for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
|
||||
pauseTime = regionServerCallable.sleep(baseTime, i);
|
||||
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
|
||||
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
|
||||
}
|
||||
|
||||
RegionAdminServiceCallable<Object> regionAdminServiceCallable =
|
||||
new RegionAdminServiceCallable<Object>(
|
||||
(ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
|
||||
TEST_UTIL.getConfiguration()), tableName, ROW) {
|
||||
public Object call(int timeout) throws IOException {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
regionAdminServiceCallable.prepare(false);
|
||||
for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
|
||||
pauseTime = regionAdminServiceCallable.sleep(baseTime, i);
|
||||
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
|
||||
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
|
||||
}
|
||||
|
||||
MasterCallable masterCallable = new MasterCallable((HConnection) TEST_UTIL.getConnection()) {
|
||||
public Object call(int timeout) throws IOException {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
|
||||
pauseTime = masterCallable.sleep(baseTime, i);
|
||||
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
|
||||
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
|
||||
}
|
||||
}
|
||||
|
||||
private void testConnectionClose(boolean allowsInterrupt) throws Exception {
|
||||
TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
|
||||
|
@ -1032,7 +1133,7 @@ public class TestHCM {
|
|||
}
|
||||
}
|
||||
|
||||
@Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test
|
||||
@Test
|
||||
public void testErrorBackoffTimeCalculation() throws Exception {
|
||||
// TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
|
||||
final long ANY_PAUSE = 100;
|
||||
|
@ -1052,40 +1153,23 @@ public class TestHCM {
|
|||
|
||||
// Check some backoff values from HConstants sequence.
|
||||
tracker.reportServerError(location);
|
||||
assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
|
||||
tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
tracker.reportServerError(location);
|
||||
tracker.reportServerError(location);
|
||||
tracker.reportServerError(location);
|
||||
assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
|
||||
tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
|
||||
// All of this shouldn't affect backoff for different location.
|
||||
assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
|
||||
tracker.reportServerError(diffLocation);
|
||||
assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
|
||||
assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
|
||||
tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
|
||||
|
||||
// Check with different base.
|
||||
assertEqualsWithJitter(ANY_PAUSE * 10,
|
||||
assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
|
||||
tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
|
||||
|
||||
// See that time from last error is taken into account. Time shift is applied after jitter,
|
||||
// so pass the original expected backoff as the base for jitter.
|
||||
long timeShift = (long)(ANY_PAUSE * 0.5);
|
||||
timeMachine.setValue(timeBase + timeShift);
|
||||
assertEqualsWithJitter((ANY_PAUSE * 5) - timeShift,
|
||||
tracker.calculateBackoffTime(location, ANY_PAUSE), ANY_PAUSE * 2);
|
||||
|
||||
// However we should not go into negative.
|
||||
timeMachine.setValue(timeBase + ANY_PAUSE * 100);
|
||||
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
|
||||
// We also should not go over the boundary; last retry would be on it.
|
||||
long timeLeft = (long)(ANY_PAUSE * 0.5);
|
||||
timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
|
||||
assertTrue(tracker.canTryMore(1));
|
||||
tracker.reportServerError(location);
|
||||
assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
timeMachine.setValue(timeBase + largeAmountOfTime);
|
||||
assertFalse(tracker.canTryMore(1));
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue