HBASE-15645 hbase.rpc.timeout is not used in operations of HTable

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Phil Yang 2016-04-18 16:37:37 +08:00 committed by stack
parent dce59294ff
commit 53d7316075
11 changed files with 207 additions and 28 deletions

View File

@ -215,6 +215,7 @@ public class HBaseAdmin implements Admin {
private final int syncWaitTimeout;
private boolean aborted;
private int operationTimeout;
private int rpcTimeout;
private RpcRetryingCallerFactory rpcCallerFactory;
private RpcControllerFactory rpcControllerFactory;
@ -239,6 +240,8 @@ public class HBaseAdmin implements Admin {
"hbase.client.retries.longer.multiplier", 10);
this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.syncWaitTimeout = this.conf.getInt(
"hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
@ -402,12 +405,12 @@ public class HBaseAdmin implements Admin {
@Override
public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
operationTimeout);
operationTimeout, rpcTimeout);
}
static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection,
RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
int operationTimeout) throws IOException {
int operationTimeout, int rpcTimeout) throws IOException {
if (tableName == null) return null;
HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
@Override
@ -424,7 +427,7 @@ public class HBaseAdmin implements Admin {
}
return null;
}
}, rpcCallerFactory, operationTimeout);
}, rpcCallerFactory, operationTimeout, rpcTimeout);
if (htd != null) {
return htd;
}
@ -2841,12 +2844,13 @@ public class HBaseAdmin implements Admin {
private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable)
throws IOException {
return executeCallable(callable, rpcCallerFactory, operationTimeout);
return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout);
}
private static <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout) throws IOException {
RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout,
int rpcTimeout) throws IOException {
RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
try {
return caller.callWithRetries(callable, operationTimeout);
} finally {

View File

@ -112,7 +112,8 @@ public class HTable implements HTableInterface {
protected int scannerCaching;
protected long scannerMaxResultSize;
private ExecutorService pool; // For Multi & Scan
private int operationTimeout;
private int operationTimeout; // global timeout for each blocking method with retrying rpc
private int rpcTimeout; // timeout for each rpc request
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG;
@ -212,6 +213,8 @@ public class HTable implements HTableInterface {
this.operationTimeout = tableName.isSystemTable() ?
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
if (this.rpcCallerFactory == null) {
@ -266,7 +269,7 @@ public class HTable implements HTableInterface {
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
rpcControllerFactory, operationTimeout);
rpcControllerFactory, operationTimeout, rpcTimeout);
if (htd != null) {
return new UnmodifyableHTableDescriptor(htd);
}
@ -439,7 +442,8 @@ public class HTable implements HTableInterface {
}
}
};
return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
// Call that takes into account the replica
@ -525,7 +529,8 @@ public class HTable implements HTableInterface {
}
}
};
rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -654,7 +659,8 @@ public class HTable implements HTableInterface {
}
}
};
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -685,7 +691,8 @@ public class HTable implements HTableInterface {
}
}
};
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -740,7 +747,8 @@ public class HTable implements HTableInterface {
}
}
};
return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -769,7 +777,8 @@ public class HTable implements HTableInterface {
}
}
};
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -799,7 +808,8 @@ public class HTable implements HTableInterface {
}
}
};
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -828,7 +838,8 @@ public class HTable implements HTableInterface {
}
}
};
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -858,7 +869,8 @@ public class HTable implements HTableInterface {
}
}
};
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -1203,14 +1215,26 @@ public class HTable implements HTableInterface {
return getKeysAndRegionsInRange(start, end, true).getFirst();
}
@Override
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
}
@Override
public int getOperationTimeout() {
return operationTimeout;
}
@Override
public int getRpcTimeout() {
return rpcTimeout;
}
@Override
public void setRpcTimeout(int rpcTimeout) {
this.rpcTimeout = rpcTimeout;
}
@Override
public String toString() {
return tableName + ";" + connection;

View File

@ -33,6 +33,7 @@ public class RpcRetryingCallerFactory {
protected final Configuration conf;
private final long pause;
private final int retries;
private final int rpcTimeout;
private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt;
private final boolean enableBackPressure;
@ -53,6 +54,7 @@ public class RpcRetryingCallerFactory {
this.interceptor = interceptor;
enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
}
/**
@ -62,11 +64,25 @@ public class RpcRetryingCallerFactory {
this.stats = statisticTracker;
}
/**
* Create a new RetryingCaller with specific rpc timeout.
*/
public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
// We store the values in the factory instance. This way, constructing new objects
// is cheap as it does not require parsing a complex structure.
RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,
startLogErrorsCnt, rpcTimeout);
return caller;
}
/**
* Create a new RetryingCaller with configured rpc timeout.
*/
public <T> RpcRetryingCaller<T> newCaller() {
// We store the values in the factory instance. This way, constructing new objects
// is cheap as it does not require parsing a complex structure.
RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,
startLogErrorsCnt);
startLogErrorsCnt, rpcTimeout);
return caller;
}

View File

@ -57,23 +57,25 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
private final long pause;
private final int maxAttempts;// how many times to try
private final int rpcTimeout;// timeout for each rpc request
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final RetryingCallerInterceptor interceptor;
private final RetryingCallerInterceptorContext context;
private final RetryingTimeTracker tracker;
public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) {
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
}
public RpcRetryingCallerImpl(long pause, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
this.pause = pause;
this.maxAttempts = retries + 1;
this.interceptor = interceptor;
context = interceptor.createEmptyContext();
this.startLogErrorsCnt = startLogErrorsCnt;
this.tracker = new RetryingTimeTracker();
this.rpcTimeout = rpcTimeout;
}
@Override
@ -96,7 +98,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
try {
callable.prepare(tries != 0); // if called with false, check table status on ZK
interceptor.intercept(context.prepare(callable, tries));
return callable.call(tracker.getRemainingTime(callTimeout));
return callable.call(getTimeout(callTimeout));
} catch (PreemptiveFastFailException e) {
throw e;
} catch (Throwable t) {
@ -209,6 +211,14 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
return t;
}
private int getTimeout(int callTimeout){
int timeout = tracker.getRemainingTime(callTimeout);
if (timeout <= 0 || rpcTimeout > 0 && rpcTimeout < timeout){
timeout = rpcTimeout;
}
return timeout;
}
@Override
public String toString() {
return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() +

View File

@ -566,4 +566,36 @@ public interface Table extends Closeable {
*/
boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException;
/**
* Set timeout (millisecond) of each operation in this Table instance, will override the value
* of hbase.client.operation.timeout in configuration.
* Operation timeout is a top-level restriction that makes sure a blocking method will not be
* blocked more than this. In each operation, if rpc request fails because of timeout or
* other reason, it will retry until success or throw a RetriesExhaustedException. But if the
* total time being blocking reach the operation timeout before retries exhausted, it will break
* early and throw SocketTimeoutException.
* @param operationTimeout the total timeout of each operation in millisecond.
*/
public void setOperationTimeout(int operationTimeout);
/**
* Get timeout (millisecond) of each operation for in Table instance.
*/
public int getOperationTimeout();
/**
* Set timeout (millisecond) of each rpc request in operations of this Table instance, will
* override the value of hbase.rpc.timeout in configuration.
* If a rpc request waiting too long, it will stop waiting and send a new request to retry until
* retries exhausted or operation timeout reached.
* @param rpcTimeout the timeout of each rpc request in millisecond.
*/
public void setRpcTimeout(int rpcTimeout);
/**
* Get timeout (millisecond) of each rpc request in this Table instance.
*/
public int getRpcTimeout();
}

View File

@ -286,10 +286,10 @@ public final class HConstants {
/** Parameter name for HBase client IPC pool size */
public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size";
/** Parameter name for HBase client operation timeout, which overrides RPC timeout */
/** Parameter name for HBase client operation timeout. */
public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout";
/** Parameter name for HBase client operation timeout, which overrides RPC timeout */
/** Parameter name for HBase client operation timeout. */
public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT =
"hbase.client.meta.operation.timeout";

View File

@ -918,10 +918,19 @@ possible configurations would overwhelm and obscure the important.
<property>
<name>hbase.rpc.timeout</name>
<value>60000</value>
<description>This is for the RPC layer to define how long HBase client applications
<description>This is for the RPC layer to define how long (millisecond) HBase client applications
take for a remote call to time out. It uses pings to check connections
but will eventually throw a TimeoutException.</description>
</property>
<property>
<name>hbase.client.operation.timeout</name>
<value>1200000</value>
<description>Operation timeout is a top-level restriction (millisecond) that makes sure a
blocking operation in Table will not be blocked more than this. In each operation, if rpc
request fails because of timeout or other reason, it will retry until success or throw
RetriesExhaustedException. But if the total time being blocking reach the operation timeout
before retries exhausted, it will break early and throw SocketTimeoutException.</description>
</property>
<property>
<name>hbase.cells.scanned.per.heartbeat.check</name>
<value>10000</value>

View File

@ -848,4 +848,24 @@ public class RemoteHTable implements Table {
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
throw new UnsupportedOperationException("checkAndMutate not implemented");
}
@Override
public void setOperationTimeout(int operationTimeout) {
throw new UnsupportedOperationException();
}
@Override
public int getOperationTimeout() {
throw new UnsupportedOperationException();
}
@Override
public void setRpcTimeout(int rpcTimeout) {
throw new UnsupportedOperationException();
}
@Override
public int getRpcTimeout() {
throw new UnsupportedOperationException();
}
}

View File

@ -294,4 +294,24 @@ public final class HTableWrapper implements Table {
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
return table.checkAndMutate(row, family, qualifier, compareOp, value, rm);
}
@Override
public void setOperationTimeout(int operationTimeout) {
table.setOperationTimeout(operationTimeout);
}
@Override
public int getOperationTimeout() {
return table.getOperationTimeout();
}
@Override
public void setRpcTimeout(int rpcTimeout) {
table.setRpcTimeout(rpcTimeout);
}
@Override
public int getRpcTimeout() {
return table.getRpcTimeout();
}
}

View File

@ -128,11 +128,21 @@ public class TestHCM {
}
}
public static class SleepCoprocessor extends BaseRegionObserver {
public static final int SLEEP_TIME = 5000;
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
Threads.sleep(SLEEP_TIME);
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
// Up the handlers; this test needs more than usual.
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
TEST_UTIL.startMiniCluster(2);
}
@ -300,11 +310,10 @@ public class TestHCM {
public void testOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
Table t = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, null);
Table t = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM});
if (t instanceof HTable) {
HTable table = (HTable) t;
table.setRpcTimeout(Integer.MAX_VALUE);
// Check that it works if the timeout is big enough
table.setOperationTimeout(120 * 1000);
table.get(new Get(FAM_NAM));
@ -328,6 +337,21 @@ public class TestHCM {
}
}
@Test(expected = RetriesExhaustedException.class)
public void testRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
assert t instanceof HTable;
HTable table = (HTable) t;
table.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
table.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
table.get(new Get(FAM_NAM));
}
}
private void testConnectionClose(boolean allowsInterrupt) throws Exception {
TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);

View File

@ -321,4 +321,24 @@ public class RegionAsTable implements Table {
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void setOperationTimeout(int operationTimeout) {
throw new UnsupportedOperationException();
}
@Override
public int getOperationTimeout() {
throw new UnsupportedOperationException();
}
@Override
public void setRpcTimeout(int rpcTimeout) {
throw new UnsupportedOperationException();
}
@Override
public int getRpcTimeout() {
throw new UnsupportedOperationException();
}
}