HBASE-19641 AsyncHBaseAdmin should use exponential backoff when polling the procedure result

This commit is contained in:
zhangduo 2018-01-03 16:41:21 +08:00
parent a47afc84cd
commit 1fa3637b4d
1 changed files with 27 additions and 30 deletions

View File

@ -89,6 +89,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hbase.thirdparty.io.netty.util.TimerTask; import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@ -2553,40 +2554,36 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.completeExceptionally(error); future.completeExceptionally(error);
return; return;
} }
getProcedureResult(procId, future); getProcedureResult(procId, future, 0);
}); });
return future; return future;
} }
private void getProcedureResult(final long procId, CompletableFuture<Void> future) { private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
this.<GetProcedureResultResponse> newMasterCaller() this.<GetProcedureResultResponse> newMasterCaller().action((controller, stub) -> this
.action( .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
(controller, stub) -> this controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
.<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), .call().whenComplete((response, error) -> {
(s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) if (error != null) {
.call() LOG.warn("failed to get the procedure result procId={}", procId,
.whenComplete( ConnectionUtils.translateException(error));
(response, error) -> { retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
if (error != null) { ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
LOG.warn("failed to get the procedure result procId=" + procId, return;
ConnectionUtils.translateException(error)); }
retryTimer.newTimeout(t -> getProcedureResult(procId, future), pauseNs, if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
TimeUnit.NANOSECONDS); retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
return; ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
} return;
if (response.getState() == GetProcedureResultResponse.State.RUNNING) { }
retryTimer.newTimeout(t -> getProcedureResult(procId, future), pauseNs, if (response.hasException()) {
TimeUnit.NANOSECONDS); IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
return; future.completeExceptionally(ioe);
} } else {
if (response.hasException()) { future.complete(null);
IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); }
future.completeExceptionally(ioe); });
} else {
future.complete(null);
}
});
} }
private <T> CompletableFuture<T> failedFuture(Throwable error) { private <T> CompletableFuture<T> failedFuture(Throwable error) {