HBASE-16172 Unify the retry logic in ScannerCallableWithReplicas and RpcRetryingCallerWithReadReplicas

This commit is contained in:
tedyu 2016-07-18 06:54:09 -07:00
parent cfc22ec1ec
commit 630a1a41df
3 changed files with 15 additions and 7 deletions

View File

@ -871,7 +871,7 @@ public class HTable implements HTableInterface, RegionLocator {
connConfiguration.getRetriesNumber(), connConfiguration.getRetriesNumber(),
operationTimeout, operationTimeout,
connConfiguration.getPrimaryCallTimeoutMicroSecond()); connConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call(); return callable.call(operationTimeout);
} }

View File

@ -47,10 +47,8 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -194,7 +192,7 @@ public class RpcRetryingCallerWithReadReplicas {
* Globally, the number of retries, timeout and so on still applies, but it's per replica, * Globally, the number of retries, timeout and so on still applies, but it's per replica,
* not global. We continue until all retries are done, or all timeouts are exceeded. * not global. We continue until all retries are done, or all timeouts are exceeded.
*/ */
public synchronized Result call() public Result call(int operationTimeout)
throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
@ -227,10 +225,17 @@ public class RpcRetryingCallerWithReadReplicas {
try { try {
try { try {
Future<Result> f = cs.take(); long start = EnvironmentEdgeManager.currentTime();
return f.get(); Future<Result> f = cs.poll(operationTimeout, TimeUnit.MILLISECONDS);
long duration = EnvironmentEdgeManager.currentTime() - start;
if (f == null) {
throw new RetriesExhaustedException("timed out after " + duration + " ms");
}
return f.get(operationTimeout - duration, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throwEnrichedException(e, retries); throwEnrichedException(e, retries);
} catch (TimeoutException te) {
throw new RetriesExhaustedException("timed out after " + operationTimeout + " ms");
} }
} catch (CancellationException e) { } catch (CancellationException e) {
throw new InterruptedIOException(); throw new InterruptedIOException();

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -195,9 +196,11 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
try { try {
long start = EnvironmentEdgeManager.currentTime();
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS); Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
long duration = EnvironmentEdgeManager.currentTime() - start;
if (f != null) { if (f != null) {
Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS); Pair<Result[], ScannerCallable> r = f.get(timeout - duration, TimeUnit.MILLISECONDS);
if (r != null && r.getSecond() != null) { if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
} }