HBASE-14521 Unify the semantic of hbase.client.retries.number (Yu Li)

This commit is contained in:
Nicolas Liochon 2015-10-15 10:42:40 +02:00
parent d6f3dae7e7
commit e7defd7d9a
9 changed files with 67 additions and 56 deletions

View File

@ -258,8 +258,9 @@ class AsyncProcess {
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
// how many times we could try in total, one more than retry number
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
@ -1117,7 +1118,7 @@ class AsyncProcess {
private void receiveGlobalFailure(
MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
errorsByServer.reportServerError(server);
Retry canRetry = errorsByServer.canRetryMore(numAttempt)
Retry canRetry = errorsByServer.canTryMore(numAttempt)
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
if (tableName == null) {
@ -1253,7 +1254,7 @@ class AsyncProcess {
if (failureCount == 0) {
errorsByServer.reportServerError(server);
// We determine canRetry only once for all calls, after reporting server failure.
canRetry = errorsByServer.canRetryMore(numAttempt);
canRetry = errorsByServer.canTryMore(numAttempt);
}
++failureCount;
Retry retry = manageError(sentAction.getOriginalIndex(), row,
@ -1301,7 +1302,7 @@ class AsyncProcess {
if (failureCount == 0) {
errorsByServer.reportServerError(server);
canRetry = errorsByServer.canRetryMore(numAttempt);
canRetry = errorsByServer.canTryMore(numAttempt);
}
connection.updateCachedLocations(
tableName, region, actions.get(0).getAction().getRow(), throwable, server);

View File

@ -201,8 +201,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
HConstants.DEFAULT_USE_META_REPLICAS);
this.numTries = tableConfig.getRetriesNumber();
HConstants.DEFAULT_USE_META_REPLICAS);
// how many times to try, one more than max *retry* time
this.numTries = tableConfig.getRetriesNumber() + 1;
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@ -847,13 +848,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
s.setConsistency(Consistency.TIMELINE);
}
int localNumRetries = (retry ? numTries : 1);
int maxAttempts = (retry ? numTries : 1);
for (int tries = 0; true; tries++) {
if (tries >= localNumRetries) {
if (tries >= maxAttempts) {
throw new NoServerForRegionException("Unable to find region for "
+ Bytes.toStringBinary(row) + " in " + tableName +
" after " + localNumRetries + " tries.");
" after " + tries + " tries.");
}
if (useCache) {
RegionLocations locations = getCachedLocation(tableName, row);
@ -941,12 +942,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
}
if (tries < localNumRetries - 1) {
if (tries < maxAttempts - 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("locateRegionInMeta parentTable=" +
TableName.META_TABLE_NAME + ", metaLocation=" +
", attempt=" + tries + " of " +
localNumRetries + " failed; retrying after sleep of " +
maxAttempts + " failed; retrying after sleep of " +
ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
}
} else {
@ -1087,21 +1088,27 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
new ConcurrentHashMap<ServerName, ServerErrors>();
private final long canRetryUntil;
private final int maxRetries;
private final int maxTries;// max number to try
private final long startTrackingTime;
public ServerErrorTracker(long timeout, int maxRetries) {
this.maxRetries = maxRetries;
/**
* Constructor
* @param timeout how long to wait before timeout, in unit of millisecond
* @param maxTries how many times to try
*/
public ServerErrorTracker(long timeout, int maxTries) {
this.maxTries = maxTries;
this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
this.startTrackingTime = new Date().getTime();
}
/**
* We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
* We stop to retry when we have exhausted BOTH the number of tries and the time allocated.
* @param numAttempt how many times we have tried by now
*/
boolean canRetryMore(int numRetry) {
boolean canTryMore(int numAttempt) {
// If there is a single try we must not take into account the time.
return numRetry < maxRetries || (maxRetries > 1 &&
return numAttempt < maxTries || (maxTries > 1 &&
EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
}

View File

@ -84,7 +84,7 @@ public class HTableMultiplexer {
private final Configuration workerConf;
private final ClusterConnection conn;
private final ExecutorService pool;
private final int retryNum;
private final int maxAttempts;
private final int perRegionServerBufferQueueSize;
private final int maxKeyValueSize;
private final ScheduledExecutorService executor;
@ -99,8 +99,9 @@ public class HTableMultiplexer {
throws IOException {
this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
this.pool = HTable.getDefaultExecutor(conf);
this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
// how many times we could try in total, one more than retry number
this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
@ -123,7 +124,7 @@ public class HTableMultiplexer {
* @return true if the request can be accepted by its corresponding buffer queue.
*/
public boolean put(TableName tableName, final Put put) {
return put(tableName, put, this.retryNum);
return put(tableName, put, this.maxAttempts);
}
/**
@ -140,7 +141,7 @@ public class HTableMultiplexer {
List <Put> failedPuts = null;
boolean result;
for (Put put : puts) {
result = put(tableName, put, this.retryNum);
result = put(tableName, put, this.maxAttempts);
if (result == false) {
// Create the failed puts list if necessary
@ -168,8 +169,8 @@ public class HTableMultiplexer {
* Return false if the queue is already full.
* @return true if the request can be accepted by its corresponding buffer queue.
*/
public boolean put(final TableName tableName, final Put put, int retry) {
if (retry <= 0) {
public boolean put(final TableName tableName, final Put put, int maxAttempts) {
if (maxAttempts <= 0) {
return false;
}
@ -181,7 +182,7 @@ public class HTableMultiplexer {
LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
// Generate a MultiPutStatus object and offer it into the queue
PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
PutStatus s = new PutStatus(loc.getRegionInfo(), put, maxAttempts);
return queue.offer(s);
}
@ -342,12 +343,12 @@ public class HTableMultiplexer {
private static class PutStatus {
private final HRegionInfo regionInfo;
private final Put put;
private final int retryCount;
private final int maxAttempCount;
public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
public PutStatus(HRegionInfo regionInfo, Put put, int maxAttempCount) {
this.regionInfo = regionInfo;
this.put = put;
this.retryCount = retryCount;
this.maxAttempCount = maxAttempCount;
}
}
@ -441,7 +442,7 @@ public class HTableMultiplexer {
private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
// Decrease the retry count
final int retryCount = ps.retryCount - 1;
final int retryCount = ps.maxAttempCount - 1;
if (retryCount <= 0) {
// Update the failed counter and no retry any more.
@ -460,7 +461,7 @@ public class HTableMultiplexer {
final TableName tableName = ps.regionInfo.getTable();
long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
multiplexer.retryNum - retryCount - 1);
multiplexer.maxAttempts - retryCount - 1);
if (LOG.isDebugEnabled()) {
LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
}

View File

@ -78,13 +78,13 @@ public class RetriesExhaustedException extends IOException {
/**
* Create a new RetriesExhaustedException from the list of prior failures.
* @param numTries
* @param numRetries How many times we have retried, one less than total attempts
* @param exceptions List of exceptions that failed before giving up
*/
@InterfaceAudience.Private
public RetriesExhaustedException(final int numTries,
public RetriesExhaustedException(final int numRetries,
final List<ThrowableWithExtraContext> exceptions) {
super(getMessage(numTries, exceptions),
super(getMessage(numRetries, exceptions),
(exceptions != null && !exceptions.isEmpty() ?
exceptions.get(exceptions.size() - 1).t : null));
}
@ -94,7 +94,7 @@ public class RetriesExhaustedException extends IOException {
StringBuilder buffer = new StringBuilder("Failed contacting ");
buffer.append(callableVitals);
buffer.append(" after ");
buffer.append(numTries + 1);
buffer.append(numTries);
buffer.append(" attempts.\nExceptions:\n");
for (Throwable t : exceptions) {
buffer.append(t.toString());
@ -103,10 +103,10 @@ public class RetriesExhaustedException extends IOException {
return buffer.toString();
}
private static String getMessage(final int numTries,
private static String getMessage(final int numRetries,
final List<ThrowableWithExtraContext> exceptions) {
StringBuilder buffer = new StringBuilder("Failed after attempts=");
buffer.append(numTries + 1);
buffer.append(numRetries + 1);
buffer.append(", exceptions:\n");
for (ThrowableWithExtraContext t : exceptions) {
buffer.append(t.toString());

View File

@ -60,7 +60,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
private final int startLogErrorsCnt;
private final long pause;
private final int retries;
private final int maxAttempts;// how many times to try
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final RetryingCallerInterceptor interceptor;
private final RetryingCallerInterceptorContext context;
@ -72,7 +72,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
public RpcRetryingCallerImpl(long pause, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {
this.pause = pause;
this.retries = retries;
this.maxAttempts = retries + 1;
this.interceptor = interceptor;
context = interceptor.createEmptyContext();
this.startLogErrorsCnt = startLogErrorsCnt;
@ -121,8 +121,8 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
} catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
if (tries > startLogErrorsCnt) {
LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
(EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
LOG.info("Call exception, tries=" + tries + ", maxAttempts=" + maxAttempts + ", started="
+ (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
+ "cancelled=" + cancelled.get() + ", msg="
+ callable.getExceptionMessageAdditionalDetail());
}
@ -130,12 +130,12 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
// translateException throws exception when should not retry: i.e. when request is bad.
interceptor.handleFailure(context, t);
t = translateException(t);
callable.throwable(t, retries != 1);
callable.throwable(t, maxAttempts != 1);
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTime(), toString());
exceptions.add(qt);
if (tries >= retries - 1) {
if (tries >= maxAttempts - 1) {
throw new RetriesExhaustedException(tries, exceptions);
}
// If the server is dead, we need to wait a little before retrying, to give
@ -162,7 +162,8 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
}
if (cancelled.get()) return null;
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries);
throw new InterruptedIOException("Interrupted after " + tries
+ " tries while maxAttempts=" + maxAttempts);
}
}
}
@ -231,6 +232,6 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
@Override
public String toString() {
return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
", pause=" + pause + ", retries=" + retries + '}';
", pause=" + pause + ", maxAttempts=" + maxAttempts + '}';
}
}

View File

@ -137,7 +137,6 @@ public class TestAsyncProcess {
AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
allReqs.add(r);
callsCt.incrementAndGet();
return r;
}
@ -571,7 +570,7 @@ public class TestAsyncProcess {
ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
Assert.assertEquals(2, ap.callsCt.get());
Assert.assertEquals(1, ap.callsCt.get());
verifyResult(ars, true);
}
@ -954,7 +953,7 @@ public class TestAsyncProcess {
// Main calls fail before replica calls can start - this is currently not handled.
// It would probably never happen if we can get location (due to retries),
// and it would require additional synchronization.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
ap.addFailures(hri1, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
@ -966,7 +965,7 @@ public class TestAsyncProcess {
public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
// Main calls fails after replica calls start. For two-replica region, one replica call
// also fails. Regardless, we get replica results for both regions.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
ap.addFailures(hri1, hri1r2, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
@ -978,7 +977,7 @@ public class TestAsyncProcess {
public void testReplicaAllCallsFailForOneRegion() throws Exception {
// For one of the region, all 3, main and replica, calls fail. For the other, replica
// call fails but its exception should not be visible as it did succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
@ -1002,7 +1001,7 @@ public class TestAsyncProcess {
Configuration conf = new Configuration();
ClusterConnection conn = createHConnectionWithReplicas();
conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
if (retries > 0) {
if (retries >= 0) {
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
}
MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);

View File

@ -252,7 +252,7 @@ public class HMasterCommandLine extends ServerCommandLine {
private int stopMaster() {
Configuration conf = getConf();
// Don't try more than once
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Admin admin = connection.getAdmin()) {
admin.shutdown();

View File

@ -103,7 +103,7 @@ public class TestBlockEvictionFromClient {
conf.setStrings("hbase.bucketcache.ioengine", "heap");
conf.setFloat("hfile.block.cache.size", 0.2f);
conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
FAMILIES_1[0] = FAMILY;
TEST_UTIL.startMiniCluster(SLAVES);

View File

@ -485,7 +485,8 @@ public class TestHCM {
Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
// We want to work on a separate connection.
c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
// try only once w/o any retry
c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
final Connection connection = ConnectionFactory.createConnection(c2);
@ -575,7 +576,8 @@ public class TestHCM {
public void testRegionCaching() throws Exception{
TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
// test with no retry, or client cache will get updated after the first failure
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
Connection connection = ConnectionFactory.createConnection(conf);
final Table table = connection.getTable(TABLE_NAME);
@ -1052,11 +1054,11 @@ public class TestHCM {
// We also should not go over the boundary; last retry would be on it.
long timeLeft = (long)(ANY_PAUSE * 0.5);
timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
assertTrue(tracker.canRetryMore(1));
assertTrue(tracker.canTryMore(1));
tracker.reportServerError(location);
assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
timeMachine.setValue(timeBase + largeAmountOfTime);
assertFalse(tracker.canRetryMore(1));
assertFalse(tracker.canTryMore(1));
} finally {
EnvironmentEdgeManager.reset();
}