diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index efa03c60250..4cd11d0470c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -871,7 +871,7 @@ public class HTable implements HTableInterface, RegionLocator { connConfiguration.getRetriesNumber(), operationTimeout, connConfiguration.getPrimaryCallTimeoutMicroSecond()); - return callable.call(); + return callable.call(operationTimeout); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 80c187c37d4..aefa3bc66b6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -47,10 +47,8 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -194,7 +192,7 @@ public class RpcRetryingCallerWithReadReplicas { * Globally, the number of retries, timeout and so on still applies, but it's per replica, * not global. We continue until all retries are done, or all timeouts are exceeded. */ - public synchronized Result call() + public Result call(int operationTimeout) throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); @@ -227,10 +225,17 @@ public class RpcRetryingCallerWithReadReplicas { try { try { - Future f = cs.take(); - return f.get(); + long start = EnvironmentEdgeManager.currentTime(); + Future f = cs.poll(operationTimeout, TimeUnit.MILLISECONDS); + long duration = EnvironmentEdgeManager.currentTime() - start; + if (f == null) { + throw new RetriesExhaustedException("timed out after " + duration + " ms"); + } + return f.get(operationTimeout - duration, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throwEnrichedException(e, retries); + } catch (TimeoutException te) { + throw new RetriesExhaustedException("timed out after " + operationTimeout + " ms"); } } catch (CancellationException e) { throw new InterruptedIOException(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 4d5bb0f00d3..4e8647fead3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import com.google.common.annotations.VisibleForTesting; @@ -195,9 +196,11 @@ class ScannerCallableWithReplicas implements RetryingCallable { addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); try { + long start = EnvironmentEdgeManager.currentTime(); Future> f = cs.poll(timeout, TimeUnit.MILLISECONDS); + long duration = EnvironmentEdgeManager.currentTime() - start; if (f != null) { - Pair r = f.get(timeout, TimeUnit.MILLISECONDS); + Pair r = f.get(timeout - duration, TimeUnit.MILLISECONDS); if (r != null && r.getSecond() != null) { updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); }