HBASE-21809 Add retry thrift client for ThriftTable/Admin

This commit is contained in:
Allan Yang 2019-01-31 16:43:09 +08:00
parent 1c71544497
commit 2776bc0151
1 changed files with 46 additions and 1 deletions

View File

@ -22,11 +22,16 @@ import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNE
import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.net.ssl.SSLException;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
@ -35,6 +40,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableBuilder;
@ -42,10 +48,13 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.thrift.Constants;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.http.HttpRequest;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.protocol.HttpContext;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
@ -227,14 +236,50 @@ public class ThriftConnection implements Connection {
return new ThriftAdmin(client.getFirst(), client.getSecond(), conf);
}
public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler {
private long pause;
public DelayRetryHandler(int retryCount, long pause) {
super(retryCount, true, Arrays.asList(
InterruptedIOException.class,
UnknownHostException.class,
SSLException.class));
this.pause = pause;
}
@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
// Don't sleep for retrying the first time
if (executionCount > 1 && pause > 0) {
try {
long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1);
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
//reset interrupt marker
Thread.currentThread().interrupt();
}
}
return super.retryRequest(exception, executionCount, context);
}
@Override
protected boolean handleAsIdempotent(HttpRequest request) {
return true;
}
}
public synchronized HttpClient getHttpClient() {
if (httpClient != null) {
return httpClient;
}
int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5);
HttpClientBuilder builder = HttpClientBuilder.create();
RequestConfig.Builder requestBuilder = RequestConfig.custom();
requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout());
requestBuilder = requestBuilder.setConnectionRequestTimeout(getOperationTimeout());
requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout());
builder.setRetryHandler(new DelayRetryHandler(retry, pause));
builder.setDefaultRequestConfig(requestBuilder.build());
httpClient = builder.build();
httpClientCreated = true;