diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index f0a212f4b41..bf85ada2b2b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -214,6 +214,7 @@ public class HBaseAdmin implements Admin { private boolean cleanupConnectionOnClose = false; // close the connection in close() private boolean closed = false; private int operationTimeout; + private int rpcTimeout; private RpcRetryingCallerFactory rpcCallerFactory; private RpcControllerFactory rpcControllerFactory; @@ -273,6 +274,8 @@ public class HBaseAdmin implements Admin { "hbase.client.retries.longer.multiplier", 10); this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.syncWaitTimeout = this.conf.getInt( "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min @@ -543,12 +546,12 @@ public class HBaseAdmin implements Admin { public HTableDescriptor getTableDescriptor(final TableName tableName) throws TableNotFoundException, IOException { return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory, - operationTimeout); + operationTimeout, rpcTimeout); } static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection, RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, - int operationTimeout) throws TableNotFoundException, IOException { + int operationTimeout, int rpcTimeout) throws TableNotFoundException, IOException { if (tableName == null) return null; HTableDescriptor htd = executeCallable(new MasterCallable(connection) { @@ -566,7 +569,7 @@ public class HBaseAdmin implements Admin { } return null; } - }, rpcCallerFactory, operationTimeout); + }, rpcCallerFactory, operationTimeout, rpcTimeout); if (htd != null) { return htd; } @@ -4373,12 +4376,13 @@ public class HBaseAdmin implements Admin { } private V executeCallable(MasterCallable callable) throws IOException { - return executeCallable(callable, rpcCallerFactory, operationTimeout); + return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout); } private static V executeCallable(MasterCallable callable, - RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout) throws IOException { - RpcRetryingCaller caller = rpcCallerFactory.newCaller(); + RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout) + throws IOException { + RpcRetryingCaller caller = rpcCallerFactory.newCaller(rpcTimeout); try { return caller.callWithRetries(callable, operationTimeout); } finally { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fe006ba43e1..efa03c60250 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -124,7 +124,8 @@ public class HTable implements HTableInterface, RegionLocator { protected int scannerCaching; protected long scannerMaxResultSize; private ExecutorService pool; // For Multi & Scan - private int operationTimeout; + private int operationTimeout; // global timeout for each blocking method with retrying rpc + private int rpcTimeout; // timeout for each rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; @@ -353,9 +354,10 @@ public class HTable implements HTableInterface, RegionLocator { if (connConfiguration == null) { connConfiguration = new ConnectionConfiguration(configuration); } - this.operationTimeout = tableName.isSystemTable() ? connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { @@ -571,7 +573,7 @@ public class HTable implements HTableInterface, RegionLocator { @Override public HTableDescriptor getTableDescriptor() throws IOException { HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, - rpcControllerFactory, operationTimeout); + rpcControllerFactory, operationTimeout, rpcTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } @@ -753,7 +755,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -858,7 +861,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } // Call that takes into account the replica @@ -974,7 +978,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1103,7 +1108,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1134,7 +1140,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1204,7 +1211,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1233,7 +1241,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1263,7 +1272,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1292,7 +1302,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1322,7 +1333,8 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1794,6 +1806,14 @@ public class HTable implements HTableInterface, RegionLocator { return operationTimeout; } + @Override public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + } + + @Override public int getRpcTimeout() { + return rpcTimeout; + } + @Override public String toString() { return tableName + ";" + connection; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index ff13c8c5200..2d183673f6a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -669,5 +669,21 @@ public class HTablePool implements Closeable { checkState(); return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation); } + + @Override public void setOperationTimeout(int operationTimeout) { + table.setOperationTimeout(operationTimeout); + } + + @Override public int getOperationTimeout() { + return table.getOperationTimeout(); + } + + @Override public void setRpcTimeout(int rpcTimeout) { + table.setRpcTimeout(rpcTimeout); + } + + @Override public int getRpcTimeout() { + return table.getRpcTimeout(); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index bb19d5fbf34..bfa21dafba9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -63,21 +63,23 @@ public class RpcRetryingCaller { private final long pause; private final int retries; + private final int rpcTimeout;// timeout for each rpc request private final AtomicBoolean cancelled = new AtomicBoolean(false); private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptorContext context; public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) { - this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt); + this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); } public RpcRetryingCaller(long pause, int retries, - RetryingCallerInterceptor interceptor, int startLogErrorsCnt) { + RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; this.retries = retries; this.interceptor = interceptor; context = interceptor.createEmptyContext(); this.startLogErrorsCnt = startLogErrorsCnt; + this.rpcTimeout = rpcTimeout; } private int getRemainingTime(int callTimeout) { @@ -97,6 +99,14 @@ public class RpcRetryingCaller { } } + private int getTimeout(int callTimeout){ + int timeout = getRemainingTime(callTimeout); + if (timeout <= 0 || rpcTimeout > 0 && rpcTimeout < timeout){ + timeout = rpcTimeout; + } + return timeout; + } + public void cancel(){ synchronized (cancelled){ cancelled.set(true); @@ -123,7 +133,7 @@ public class RpcRetryingCaller { try { callable.prepare(tries != 0); // if called with false, check table status on ZK interceptor.intercept(context.prepare(callable, tries)); - return callable.call(getRemainingTime(callTimeout)); + return callable.call(getTimeout(callTimeout)); } catch (PreemptiveFastFailException e) { throw e; } catch (Throwable t) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index dac6bed3b2e..6273624f051 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -33,6 +33,7 @@ public class RpcRetryingCallerFactory { protected final Configuration conf; private final long pause; private final int retries; + private final int rpcTimeout; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; private final boolean enableBackPressure; @@ -53,6 +54,7 @@ public class RpcRetryingCallerFactory { this.interceptor = interceptor; enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT); } /** @@ -62,11 +64,25 @@ public class RpcRetryingCallerFactory { this.stats = statisticTracker; } + /** + * Create a new RetryingCaller with specific rpc timeout. + */ + public RpcRetryingCaller newCaller(int rpcTimeout) { + // We store the values in the factory instance. This way, constructing new objects + // is cheap as it does not require parsing a complex structure. + RpcRetryingCaller caller = new RpcRetryingCaller(pause, retries, interceptor, + startLogErrorsCnt, rpcTimeout); + return caller; + } + + /** + * Create a new RetryingCaller with configured rpc timeout. + */ public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. RpcRetryingCaller caller = new RpcRetryingCaller(pause, retries, interceptor, - startLogErrorsCnt); + startLogErrorsCnt, rpcTimeout); return caller; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 9a6744bf5a2..8c6169da28e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -595,4 +595,35 @@ public interface Table extends Closeable { */ boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException; + + /** + * Set timeout (millisecond) of each operation in this Table instance, will override the value + * of hbase.client.operation.timeout in configuration. + * Operation timeout is a top-level restriction that makes sure a blocking method will not be + * blocked more than this. In each operation, if rpc request fails because of timeout or + * other reason, it will retry until success or throw a RetriesExhaustedException. But if the + * total time being blocking reach the operation timeout before retries exhausted, it will break + * early and throw SocketTimeoutException. + * @param operationTimeout the total timeout of each operation in millisecond. + */ + public void setOperationTimeout(int operationTimeout); + + /** + * Get timeout (millisecond) of each operation for in Table instance. + */ + public int getOperationTimeout(); + + /** + * Set timeout (millisecond) of each rpc request in operations of this Table instance, will + * override the value of hbase.rpc.timeout in configuration. + * If a rpc request waiting too long, it will stop waiting and send a new request to retry until + * retries exhausted or operation timeout reached. + * @param rpcTimeout the timeout of each rpc request in millisecond. + */ + public void setRpcTimeout(int rpcTimeout); + + /** + * Get timeout (millisecond) of each rpc request in this Table instance. + */ + public int getRpcTimeout(); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 7a77c7e3844..9d2b6fc6a13 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -283,10 +283,10 @@ public final class HConstants { /** Parameter name for HBase client IPC pool size */ public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size"; - /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ + /** Parameter name for HBase client operation timeout. */ public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout"; - /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ + /** Parameter name for HBase client operation timeout. */ public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT = "hbase.client.meta.operation.timeout"; diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index fae986edb66..56b915e45c5 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -844,10 +844,19 @@ possible configurations would overwhelm and obscure the important. hbase.rpc.timeout 60000 - This is for the RPC layer to define how long HBase client applications + This is for the RPC layer to define how long (millisecond) HBase client applications take for a remote call to time out. It uses pings to check connections but will eventually throw a TimeoutException. + + hbase.client.operation.timeout + 1200000 + Operation timeout is a top-level restriction (millisecond) that makes sure a + blocking operation in Table will not be blocked more than this. In each operation, if rpc + request fails because of timeout or other reason, it will retry until success or throw + RetriesExhaustedException. But if the total time being blocking reach the operation timeout + before retries exhausted, it will break early and throw SocketTimeoutException. + hbase.cells.scanned.per.heartbeat.check 10000 diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index b6ed3308ed6..8fa1b8a1f76 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -857,4 +857,20 @@ public class RemoteHTable implements Table { CompareOp compareOp, byte[] value, RowMutations rm) throws IOException { throw new UnsupportedOperationException("checkAndMutate not implemented"); } + + @Override public void setOperationTimeout(int operationTimeout) { + throw new UnsupportedOperationException(); + } + + @Override public int getOperationTimeout() { + throw new UnsupportedOperationException(); + } + + @Override public void setRpcTimeout(int rpcTimeout) { + throw new UnsupportedOperationException(); + } + + @Override public int getRpcTimeout() { + throw new UnsupportedOperationException(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index 1f84bb4e5b2..2d25f63e413 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -365,4 +365,20 @@ public class HTableWrapper implements HTableInterface { CompareOp compareOp, byte[] value, RowMutations rm) throws IOException { return table.checkAndMutate(row, family, qualifier, compareOp, value, rm); } + + @Override public void setOperationTimeout(int operationTimeout) { + table.setOperationTimeout(operationTimeout); + } + + @Override public int getOperationTimeout() { + return table.getOperationTimeout(); + } + + @Override public void setRpcTimeout(int rpcTimeout) { + table.setRpcTimeout(rpcTimeout); + } + + @Override public int getRpcTimeout() { + return table.getRpcTimeout(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 1669fc4a253..a8c7113885f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -134,11 +134,21 @@ public class TestHCM { } } + public static class SleepCoprocessor extends BaseRegionObserver { + public static final int SLEEP_TIME = 5000; + @Override + public void preGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + Threads.sleep(SLEEP_TIME); + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); // Up the handlers; this test needs more than usual. TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); TEST_UTIL.startMiniCluster(2); } @@ -309,7 +319,7 @@ public class TestHCM { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout"); hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); - + table.setRpcTimeout(Integer.MAX_VALUE); // Check that it works if the timeout is big enough table.setOperationTimeout(120 * 1000); table.get(new Get(FAM_NAM)); @@ -332,6 +342,20 @@ public class TestHCM { } } + @Test(expected = RetriesExhaustedException.class) + public void testRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + assert t instanceof HTable; + HTable table = (HTable) t; + table.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + table.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + table.get(new Get(FAM_NAM)); + } + } private void testConnectionClose(boolean allowsInterrupt) throws Exception { TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);