HBASE-15645 hbase.rpc.timeout is not used in operations of HTable
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
2562043e23
commit
d5acbdd1e4
|
@ -214,6 +214,7 @@ public class HBaseAdmin implements Admin {
|
||||||
private boolean cleanupConnectionOnClose = false; // close the connection in close()
|
private boolean cleanupConnectionOnClose = false; // close the connection in close()
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
private int operationTimeout;
|
private int operationTimeout;
|
||||||
|
private int rpcTimeout;
|
||||||
|
|
||||||
private RpcRetryingCallerFactory rpcCallerFactory;
|
private RpcRetryingCallerFactory rpcCallerFactory;
|
||||||
private RpcControllerFactory rpcControllerFactory;
|
private RpcControllerFactory rpcControllerFactory;
|
||||||
|
@ -273,6 +274,8 @@ public class HBaseAdmin implements Admin {
|
||||||
"hbase.client.retries.longer.multiplier", 10);
|
"hbase.client.retries.longer.multiplier", 10);
|
||||||
this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||||
|
this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
this.syncWaitTimeout = this.conf.getInt(
|
this.syncWaitTimeout = this.conf.getInt(
|
||||||
"hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
|
"hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
|
||||||
|
|
||||||
|
@ -543,12 +546,12 @@ public class HBaseAdmin implements Admin {
|
||||||
public HTableDescriptor getTableDescriptor(final TableName tableName)
|
public HTableDescriptor getTableDescriptor(final TableName tableName)
|
||||||
throws TableNotFoundException, IOException {
|
throws TableNotFoundException, IOException {
|
||||||
return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
|
return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
|
||||||
operationTimeout);
|
operationTimeout, rpcTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection,
|
static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection,
|
||||||
RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
|
RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
|
||||||
int operationTimeout) throws TableNotFoundException, IOException {
|
int operationTimeout, int rpcTimeout) throws TableNotFoundException, IOException {
|
||||||
|
|
||||||
if (tableName == null) return null;
|
if (tableName == null) return null;
|
||||||
HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
|
HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
|
||||||
|
@ -566,7 +569,7 @@ public class HBaseAdmin implements Admin {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}, rpcCallerFactory, operationTimeout);
|
}, rpcCallerFactory, operationTimeout, rpcTimeout);
|
||||||
if (htd != null) {
|
if (htd != null) {
|
||||||
return htd;
|
return htd;
|
||||||
}
|
}
|
||||||
|
@ -4373,12 +4376,13 @@ public class HBaseAdmin implements Admin {
|
||||||
}
|
}
|
||||||
|
|
||||||
private <V> V executeCallable(MasterCallable<V> callable) throws IOException {
|
private <V> V executeCallable(MasterCallable<V> callable) throws IOException {
|
||||||
return executeCallable(callable, rpcCallerFactory, operationTimeout);
|
return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <V> V executeCallable(MasterCallable<V> callable,
|
private static <V> V executeCallable(MasterCallable<V> callable,
|
||||||
RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout) throws IOException {
|
RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
|
||||||
RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
|
throws IOException {
|
||||||
|
RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
|
||||||
try {
|
try {
|
||||||
return caller.callWithRetries(callable, operationTimeout);
|
return caller.callWithRetries(callable, operationTimeout);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -124,7 +124,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
protected int scannerCaching;
|
protected int scannerCaching;
|
||||||
protected long scannerMaxResultSize;
|
protected long scannerMaxResultSize;
|
||||||
private ExecutorService pool; // For Multi & Scan
|
private ExecutorService pool; // For Multi & Scan
|
||||||
private int operationTimeout;
|
private int operationTimeout; // global timeout for each blocking method with retrying rpc
|
||||||
|
private int rpcTimeout; // timeout for each rpc request
|
||||||
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
|
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
|
||||||
private final boolean cleanupConnectionOnClose; // close the connection in close()
|
private final boolean cleanupConnectionOnClose; // close the connection in close()
|
||||||
private Consistency defaultConsistency = Consistency.STRONG;
|
private Consistency defaultConsistency = Consistency.STRONG;
|
||||||
|
@ -353,9 +354,10 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
if (connConfiguration == null) {
|
if (connConfiguration == null) {
|
||||||
connConfiguration = new ConnectionConfiguration(configuration);
|
connConfiguration = new ConnectionConfiguration(configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.operationTimeout = tableName.isSystemTable() ?
|
this.operationTimeout = tableName.isSystemTable() ?
|
||||||
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
|
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
|
||||||
|
this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
this.scannerCaching = connConfiguration.getScannerCaching();
|
this.scannerCaching = connConfiguration.getScannerCaching();
|
||||||
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
||||||
if (this.rpcCallerFactory == null) {
|
if (this.rpcCallerFactory == null) {
|
||||||
|
@ -571,7 +573,7 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
@Override
|
@Override
|
||||||
public HTableDescriptor getTableDescriptor() throws IOException {
|
public HTableDescriptor getTableDescriptor() throws IOException {
|
||||||
HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
|
HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
|
||||||
rpcControllerFactory, operationTimeout);
|
rpcControllerFactory, operationTimeout, rpcTimeout);
|
||||||
if (htd != null) {
|
if (htd != null) {
|
||||||
return new UnmodifyableHTableDescriptor(htd);
|
return new UnmodifyableHTableDescriptor(htd);
|
||||||
}
|
}
|
||||||
|
@ -753,7 +755,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
|
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -858,7 +861,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
|
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call that takes into account the replica
|
// Call that takes into account the replica
|
||||||
|
@ -974,7 +978,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
|
rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1103,7 +1108,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
|
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1134,7 +1140,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
|
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1204,7 +1211,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
|
return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1233,7 +1241,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
|
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1263,7 +1272,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
|
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1292,7 +1302,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
|
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1322,7 +1333,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
|
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
||||||
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1794,6 +1806,14 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||||
return operationTimeout;
|
return operationTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public void setRpcTimeout(int rpcTimeout) {
|
||||||
|
this.rpcTimeout = rpcTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int getRpcTimeout() {
|
||||||
|
return rpcTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return tableName + ";" + connection;
|
return tableName + ";" + connection;
|
||||||
|
|
|
@ -669,5 +669,21 @@ public class HTablePool implements Closeable {
|
||||||
checkState();
|
checkState();
|
||||||
return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
|
return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public void setOperationTimeout(int operationTimeout) {
|
||||||
|
table.setOperationTimeout(operationTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int getOperationTimeout() {
|
||||||
|
return table.getOperationTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void setRpcTimeout(int rpcTimeout) {
|
||||||
|
table.setRpcTimeout(rpcTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int getRpcTimeout() {
|
||||||
|
return table.getRpcTimeout();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,21 +63,23 @@ public class RpcRetryingCaller<T> {
|
||||||
|
|
||||||
private final long pause;
|
private final long pause;
|
||||||
private final int retries;
|
private final int retries;
|
||||||
|
private final int rpcTimeout;// timeout for each rpc request
|
||||||
private final AtomicBoolean cancelled = new AtomicBoolean(false);
|
private final AtomicBoolean cancelled = new AtomicBoolean(false);
|
||||||
private final RetryingCallerInterceptor interceptor;
|
private final RetryingCallerInterceptor interceptor;
|
||||||
private final RetryingCallerInterceptorContext context;
|
private final RetryingCallerInterceptorContext context;
|
||||||
|
|
||||||
public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
|
public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
|
||||||
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
|
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RpcRetryingCaller(long pause, int retries,
|
public RpcRetryingCaller(long pause, int retries,
|
||||||
RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {
|
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
|
||||||
this.pause = pause;
|
this.pause = pause;
|
||||||
this.retries = retries;
|
this.retries = retries;
|
||||||
this.interceptor = interceptor;
|
this.interceptor = interceptor;
|
||||||
context = interceptor.createEmptyContext();
|
context = interceptor.createEmptyContext();
|
||||||
this.startLogErrorsCnt = startLogErrorsCnt;
|
this.startLogErrorsCnt = startLogErrorsCnt;
|
||||||
|
this.rpcTimeout = rpcTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getRemainingTime(int callTimeout) {
|
private int getRemainingTime(int callTimeout) {
|
||||||
|
@ -97,6 +99,14 @@ public class RpcRetryingCaller<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getTimeout(int callTimeout){
|
||||||
|
int timeout = getRemainingTime(callTimeout);
|
||||||
|
if (timeout <= 0 || rpcTimeout > 0 && rpcTimeout < timeout){
|
||||||
|
timeout = rpcTimeout;
|
||||||
|
}
|
||||||
|
return timeout;
|
||||||
|
}
|
||||||
|
|
||||||
public void cancel(){
|
public void cancel(){
|
||||||
synchronized (cancelled){
|
synchronized (cancelled){
|
||||||
cancelled.set(true);
|
cancelled.set(true);
|
||||||
|
@ -123,7 +133,7 @@ public class RpcRetryingCaller<T> {
|
||||||
try {
|
try {
|
||||||
callable.prepare(tries != 0); // if called with false, check table status on ZK
|
callable.prepare(tries != 0); // if called with false, check table status on ZK
|
||||||
interceptor.intercept(context.prepare(callable, tries));
|
interceptor.intercept(context.prepare(callable, tries));
|
||||||
return callable.call(getRemainingTime(callTimeout));
|
return callable.call(getTimeout(callTimeout));
|
||||||
} catch (PreemptiveFastFailException e) {
|
} catch (PreemptiveFastFailException e) {
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|
|
@ -33,6 +33,7 @@ public class RpcRetryingCallerFactory {
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
private final long pause;
|
private final long pause;
|
||||||
private final int retries;
|
private final int retries;
|
||||||
|
private final int rpcTimeout;
|
||||||
private final RetryingCallerInterceptor interceptor;
|
private final RetryingCallerInterceptor interceptor;
|
||||||
private final int startLogErrorsCnt;
|
private final int startLogErrorsCnt;
|
||||||
private final boolean enableBackPressure;
|
private final boolean enableBackPressure;
|
||||||
|
@ -53,6 +54,7 @@ public class RpcRetryingCallerFactory {
|
||||||
this.interceptor = interceptor;
|
this.interceptor = interceptor;
|
||||||
enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
|
enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
|
||||||
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
|
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
|
||||||
|
rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -62,11 +64,25 @@ public class RpcRetryingCallerFactory {
|
||||||
this.stats = statisticTracker;
|
this.stats = statisticTracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new RetryingCaller with specific rpc timeout.
|
||||||
|
*/
|
||||||
|
public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
|
||||||
|
// We store the values in the factory instance. This way, constructing new objects
|
||||||
|
// is cheap as it does not require parsing a complex structure.
|
||||||
|
RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor,
|
||||||
|
startLogErrorsCnt, rpcTimeout);
|
||||||
|
return caller;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new RetryingCaller with configured rpc timeout.
|
||||||
|
*/
|
||||||
public <T> RpcRetryingCaller<T> newCaller() {
|
public <T> RpcRetryingCaller<T> newCaller() {
|
||||||
// We store the values in the factory instance. This way, constructing new objects
|
// We store the values in the factory instance. This way, constructing new objects
|
||||||
// is cheap as it does not require parsing a complex structure.
|
// is cheap as it does not require parsing a complex structure.
|
||||||
RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor,
|
RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor,
|
||||||
startLogErrorsCnt);
|
startLogErrorsCnt, rpcTimeout);
|
||||||
return caller;
|
return caller;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -595,4 +595,35 @@ public interface Table extends Closeable {
|
||||||
*/
|
*/
|
||||||
boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||||
CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException;
|
CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set timeout (millisecond) of each operation in this Table instance, will override the value
|
||||||
|
* of hbase.client.operation.timeout in configuration.
|
||||||
|
* Operation timeout is a top-level restriction that makes sure a blocking method will not be
|
||||||
|
* blocked more than this. In each operation, if rpc request fails because of timeout or
|
||||||
|
* other reason, it will retry until success or throw a RetriesExhaustedException. But if the
|
||||||
|
* total time being blocking reach the operation timeout before retries exhausted, it will break
|
||||||
|
* early and throw SocketTimeoutException.
|
||||||
|
* @param operationTimeout the total timeout of each operation in millisecond.
|
||||||
|
*/
|
||||||
|
public void setOperationTimeout(int operationTimeout);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get timeout (millisecond) of each operation for in Table instance.
|
||||||
|
*/
|
||||||
|
public int getOperationTimeout();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set timeout (millisecond) of each rpc request in operations of this Table instance, will
|
||||||
|
* override the value of hbase.rpc.timeout in configuration.
|
||||||
|
* If a rpc request waiting too long, it will stop waiting and send a new request to retry until
|
||||||
|
* retries exhausted or operation timeout reached.
|
||||||
|
* @param rpcTimeout the timeout of each rpc request in millisecond.
|
||||||
|
*/
|
||||||
|
public void setRpcTimeout(int rpcTimeout);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get timeout (millisecond) of each rpc request in this Table instance.
|
||||||
|
*/
|
||||||
|
public int getRpcTimeout();
|
||||||
}
|
}
|
||||||
|
|
|
@ -283,10 +283,10 @@ public final class HConstants {
|
||||||
/** Parameter name for HBase client IPC pool size */
|
/** Parameter name for HBase client IPC pool size */
|
||||||
public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size";
|
public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size";
|
||||||
|
|
||||||
/** Parameter name for HBase client operation timeout, which overrides RPC timeout */
|
/** Parameter name for HBase client operation timeout. */
|
||||||
public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout";
|
public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout";
|
||||||
|
|
||||||
/** Parameter name for HBase client operation timeout, which overrides RPC timeout */
|
/** Parameter name for HBase client operation timeout. */
|
||||||
public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT =
|
public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT =
|
||||||
"hbase.client.meta.operation.timeout";
|
"hbase.client.meta.operation.timeout";
|
||||||
|
|
||||||
|
|
|
@ -844,10 +844,19 @@ possible configurations would overwhelm and obscure the important.
|
||||||
<property>
|
<property>
|
||||||
<name>hbase.rpc.timeout</name>
|
<name>hbase.rpc.timeout</name>
|
||||||
<value>60000</value>
|
<value>60000</value>
|
||||||
<description>This is for the RPC layer to define how long HBase client applications
|
<description>This is for the RPC layer to define how long (millisecond) HBase client applications
|
||||||
take for a remote call to time out. It uses pings to check connections
|
take for a remote call to time out. It uses pings to check connections
|
||||||
but will eventually throw a TimeoutException.</description>
|
but will eventually throw a TimeoutException.</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.client.operation.timeout</name>
|
||||||
|
<value>1200000</value>
|
||||||
|
<description>Operation timeout is a top-level restriction (millisecond) that makes sure a
|
||||||
|
blocking operation in Table will not be blocked more than this. In each operation, if rpc
|
||||||
|
request fails because of timeout or other reason, it will retry until success or throw
|
||||||
|
RetriesExhaustedException. But if the total time being blocking reach the operation timeout
|
||||||
|
before retries exhausted, it will break early and throw SocketTimeoutException.</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hbase.cells.scanned.per.heartbeat.check</name>
|
<name>hbase.cells.scanned.per.heartbeat.check</name>
|
||||||
<value>10000</value>
|
<value>10000</value>
|
||||||
|
|
|
@ -857,4 +857,20 @@ public class RemoteHTable implements Table {
|
||||||
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
|
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
|
||||||
throw new UnsupportedOperationException("checkAndMutate not implemented");
|
throw new UnsupportedOperationException("checkAndMutate not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public void setOperationTimeout(int operationTimeout) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int getOperationTimeout() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void setRpcTimeout(int rpcTimeout) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int getRpcTimeout() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -365,4 +365,20 @@ public class HTableWrapper implements HTableInterface {
|
||||||
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
|
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
|
||||||
return table.checkAndMutate(row, family, qualifier, compareOp, value, rm);
|
return table.checkAndMutate(row, family, qualifier, compareOp, value, rm);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public void setOperationTimeout(int operationTimeout) {
|
||||||
|
table.setOperationTimeout(operationTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int getOperationTimeout() {
|
||||||
|
return table.getOperationTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void setRpcTimeout(int rpcTimeout) {
|
||||||
|
table.setRpcTimeout(rpcTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int getRpcTimeout() {
|
||||||
|
return table.getRpcTimeout();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,11 +134,21 @@ public class TestHCM {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class SleepCoprocessor extends BaseRegionObserver {
|
||||||
|
public static final int SLEEP_TIME = 5000;
|
||||||
|
@Override
|
||||||
|
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
final Get get, final List<Cell> results) throws IOException {
|
||||||
|
Threads.sleep(SLEEP_TIME);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
|
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
|
||||||
// Up the handlers; this test needs more than usual.
|
// Up the handlers; this test needs more than usual.
|
||||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
|
||||||
TEST_UTIL.startMiniCluster(2);
|
TEST_UTIL.startMiniCluster(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,7 +319,7 @@ public class TestHCM {
|
||||||
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
|
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
|
||||||
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
|
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
|
||||||
HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
|
HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
|
||||||
|
table.setRpcTimeout(Integer.MAX_VALUE);
|
||||||
// Check that it works if the timeout is big enough
|
// Check that it works if the timeout is big enough
|
||||||
table.setOperationTimeout(120 * 1000);
|
table.setOperationTimeout(120 * 1000);
|
||||||
table.get(new Get(FAM_NAM));
|
table.get(new Get(FAM_NAM));
|
||||||
|
@ -332,6 +342,20 @@ public class TestHCM {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = RetriesExhaustedException.class)
|
||||||
|
public void testRpcTimeout() throws Exception {
|
||||||
|
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
|
||||||
|
hdt.addCoprocessor(SleepCoprocessor.class.getName());
|
||||||
|
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
|
||||||
|
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
|
||||||
|
assert t instanceof HTable;
|
||||||
|
HTable table = (HTable) t;
|
||||||
|
table.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
|
||||||
|
table.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
|
||||||
|
table.get(new Get(FAM_NAM));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void testConnectionClose(boolean allowsInterrupt) throws Exception {
|
private void testConnectionClose(boolean allowsInterrupt) throws Exception {
|
||||||
TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
|
TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
|
||||||
|
|
Loading…
Reference in New Issue