From 2776bc0151051c8d20d9b1c2ac6142ade6a31b62 Mon Sep 17 00:00:00 2001 From: Allan Yang Date: Thu, 31 Jan 2019 16:43:09 +0800 Subject: [PATCH] HBASE-21809 Add retry thrift client for ThriftTable/Admin --- .../thrift2/client/ThriftConnection.java | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java index cc186aa8fee..36e513c157f 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java @@ -22,11 +22,16 @@ import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNE import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT; import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.Constructor; +import java.net.UnknownHostException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; +import javax.net.ssl.SSLException; + import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -35,6 +40,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableBuilder; @@ -42,10 +48,13 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.thrift.Constants; import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.hadoop.hbase.util.Pair; +import org.apache.http.HttpRequest; import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.utils.HttpClientUtils; +import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.protocol.HttpContext; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; @@ -227,14 +236,50 @@ public class ThriftConnection implements Connection { return new ThriftAdmin(client.getFirst(), client.getSecond(), conf); } + public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler { + private long pause; + + public DelayRetryHandler(int retryCount, long pause) { + super(retryCount, true, Arrays.asList( + InterruptedIOException.class, + UnknownHostException.class, + SSLException.class)); + this.pause = pause; + } + + @Override + public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { + // Don't sleep for retrying the first time + if (executionCount > 1 && pause > 0) { + try { + long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1); + Thread.sleep(sleepTime); + } catch (InterruptedException ie) { + //reset interrupt marker + Thread.currentThread().interrupt(); + } + } + return super.retryRequest(exception, executionCount, context); + } + + @Override + protected boolean handleAsIdempotent(HttpRequest request) { + return true; + } + } + public synchronized HttpClient getHttpClient() { if (httpClient != null) { return httpClient; } + int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5); HttpClientBuilder builder = HttpClientBuilder.create(); RequestConfig.Builder requestBuilder = RequestConfig.custom(); requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout()); - requestBuilder = requestBuilder.setConnectionRequestTimeout(getOperationTimeout()); + requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout()); + builder.setRetryHandler(new DelayRetryHandler(retry, pause)); builder.setDefaultRequestConfig(requestBuilder.build()); httpClient = builder.build(); httpClientCreated = true;