From 1fa3637b4d0020b1c4387610e8aa6b970c0138b8 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 3 Jan 2018 16:41:21 +0800 Subject: [PATCH] HBASE-19641 AsyncHBaseAdmin should use exponential backoff when polling the procedure result --- .../hbase/client/RawAsyncHBaseAdmin.java | 57 +++++++++---------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 7a8d081dbbf..ceda2806d9e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -89,6 +89,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; @@ -2553,40 +2554,36 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(error); return; } - getProcedureResult(procId, future); + getProcedureResult(procId, future, 0); }); return future; } - private void getProcedureResult(final long procId, CompletableFuture future) { - this. newMasterCaller() - .action( - (controller, stub) -> this - . call( - controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), - (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) - .call() - .whenComplete( - (response, error) -> { - if (error != null) { - LOG.warn("failed to get the procedure result procId=" + procId, - ConnectionUtils.translateException(error)); - retryTimer.newTimeout(t -> getProcedureResult(procId, future), pauseNs, - TimeUnit.NANOSECONDS); - return; - } - if (response.getState() == GetProcedureResultResponse.State.RUNNING) { - retryTimer.newTimeout(t -> getProcedureResult(procId, future), pauseNs, - TimeUnit.NANOSECONDS); - return; - } - if (response.hasException()) { - IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); - future.completeExceptionally(ioe); - } else { - future.complete(null); - } - }); + private void getProcedureResult(long procId, CompletableFuture future, int retries) { + this. newMasterCaller().action((controller, stub) -> this + . call( + controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), + (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) + .call().whenComplete((response, error) -> { + if (error != null) { + LOG.warn("failed to get the procedure result procId={}", procId, + ConnectionUtils.translateException(error)); + retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); + return; + } + if (response.getState() == GetProcedureResultResponse.State.RUNNING) { + retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); + return; + } + if (response.hasException()) { + IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); + future.completeExceptionally(ioe); + } else { + future.complete(null); + } + }); } private CompletableFuture failedFuture(Throwable error) {