From 30d7eeaefe431bc263519064662e6dc8ad8ff05a Mon Sep 17 00:00:00 2001 From: Vivek Date: Fri, 5 Aug 2016 17:25:06 -0700 Subject: [PATCH] HBASE-15866 Split hbase.rpc.timeout into *.read.timeout and *.write.timeout Signed-off-by: Andrew Purtell Amending-Author: Andrew Purtell --- .../hadoop/hbase/client/AsyncProcess.java | 5 +- .../hbase/client/BufferedMutatorImpl.java | 8 +- .../client/ConnectionImplementation.java | 3 +- .../apache/hadoop/hbase/client/HTable.java | 58 ++++++++---- .../hbase/client/HTableMultiplexer.java | 6 +- .../org/apache/hadoop/hbase/client/Table.java | 43 ++++++++- .../hadoop/hbase/client/TestAsyncProcess.java | 11 ++- .../org/apache/hadoop/hbase/HConstants.java | 13 +++ .../hbase/rest/client/RemoteHTable.java | 22 +++++ .../hadoop/hbase/client/HTableWrapper.java | 14 +++ .../client/HConnectionTestingUtility.java | 6 +- .../apache/hadoop/hbase/client/TestHCM.java | 90 +++++++++++++++++-- .../hbase/regionserver/RegionAsTable.java | 14 +++ 13 files changed, 257 insertions(+), 36 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 451456099aa..1383ca05a06 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -281,7 +281,7 @@ class AsyncProcess { public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, - RpcControllerFactory rpcFactory) { + RpcControllerFactory rpcFactory, int rpcTimeout) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } @@ -297,8 +297,7 @@ class AsyncProcess { // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; - this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.timeout = rpcTimeout; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index e98ad4e301a..39e4f75c6bf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -72,6 +73,7 @@ public class BufferedMutatorImpl implements BufferedMutator { private final int maxKeyValueSize; private boolean closed = false; private final ExecutorService pool; + private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test @@ -94,8 +96,12 @@ public class BufferedMutatorImpl implements BufferedMutator { this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); + this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory); + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 8dcda13f841..04edd2514a0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1823,7 +1823,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory); + int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fbd9f51cf7f..882e21b15c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -112,7 +112,8 @@ public class HTable implements Table { protected long scannerMaxResultSize; private ExecutorService pool; // For Multi & Scan private int operationTimeout; // global timeout for each blocking method with retrying rpc - private int rpcTimeout; // timeout for each rpc request + private int readRpcTimeout; // timeout for each read rpc request + private int writeRpcTimeout; // timeout for each write rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; @@ -212,8 +213,12 @@ public class HTable implements Table { this.operationTimeout = tableName.isSystemTable() ? connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); - this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { @@ -257,7 +262,7 @@ public class HTable implements Table { @Override public HTableDescriptor getTableDescriptor() throws IOException { HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, - rpcControllerFactory, operationTimeout, rpcTimeout); + rpcControllerFactory, operationTimeout, readRpcTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } @@ -430,7 +435,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.newCaller(readRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -528,7 +533,7 @@ public class HTable implements Table { } } }; - rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -654,7 +659,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -686,7 +691,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -742,7 +747,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -772,7 +777,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -803,7 +808,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -833,7 +838,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -864,7 +869,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1196,13 +1201,34 @@ public class HTable implements Table { } @Override + @Deprecated public int getRpcTimeout() { - return rpcTimeout; + return readRpcTimeout; } @Override + @Deprecated public void setRpcTimeout(int rpcTimeout) { - this.rpcTimeout = rpcTimeout; + this.readRpcTimeout = rpcTimeout; + this.writeRpcTimeout = rpcTimeout; + } + + @Override + public int getWriteRpcTimeout() { + return writeRpcTimeout; + } + + @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { + this.writeRpcTimeout = writeRpcTimeout; + } + + @Override + public int getReadRpcTimeout() { return readRpcTimeout; } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { + this.readRpcTimeout = readRpcTimeout; } @Override @@ -1282,7 +1308,7 @@ public class HTable implements Table { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration)); + true, RpcControllerFactory.instantiate(configuration), readRpcTimeout); AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index f1bbcb37e6f..ba963c25a5f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -442,6 +442,7 @@ public class HTableMultiplexer { private final ScheduledExecutorService executor; private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); + private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, @@ -451,7 +452,10 @@ public class HTableMultiplexer { this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory); + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index f2cec97f3c1..4d93442ea6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -584,18 +584,57 @@ public interface Table extends Closeable { */ int getOperationTimeout(); + /** + * Get timeout (millisecond) of each rpc request in this Table instance. + * + * @returns Currently configured read timeout + * @deprecated Use getReadRpcTimeout or getWriteRpcTimeout instead + */ + @Deprecated + int getRpcTimeout(); + /** * Set timeout (millisecond) of each rpc request in operations of this Table instance, will * override the value of hbase.rpc.timeout in configuration. * If a rpc request waiting too long, it will stop waiting and send a new request to retry until * retries exhausted or operation timeout reached. + *

+ * NOTE: This will set both the read and write timeout settings to the provided value. + * * @param rpcTimeout the timeout of each rpc request in millisecond. + * + * @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead */ + @Deprecated void setRpcTimeout(int rpcTimeout); /** - * Get timeout (millisecond) of each rpc request in this Table instance. + * Get timeout (millisecond) of each rpc read request in this Table instance. */ - int getRpcTimeout(); + int getReadRpcTimeout(); + /** + * Set timeout (millisecond) of each rpc read request in operations of this Table instance, will + * override the value of hbase.rpc.read.timeout in configuration. + * If a rpc read request waiting too long, it will stop waiting and send a new request to retry + * until retries exhausted or operation timeout reached. + * + * @param readRpcTimeout + */ + void setReadRpcTimeout(int readRpcTimeout); + + /** + * Get timeout (millisecond) of each rpc write request in this Table instance. + */ + int getWriteRpcTimeout(); + + /** + * Set timeout (millisecond) of each rpc write request in operations of this Table instance, will + * override the value of hbase.rpc.write.timeout in configuration. + * If a rpc write request waiting too long, it will stop waiting and send a new request to retry + * until retries exhausted or operation timeout reached. + * + * @param writeRpcTimeout + */ + void setWriteRpcTimeout(int writeRpcTimeout); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index d94331605e3..0aa97045cd9 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -138,6 +138,7 @@ public class TestAsyncProcess { final AtomicInteger nbActions = new AtomicInteger(); public List allReqs = new ArrayList(); public AtomicInteger callsCt = new AtomicInteger(); + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @Override protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, @@ -157,14 +158,14 @@ public class TestAsyncProcess { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); } public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, @@ -176,7 +177,7 @@ public class TestAsyncProcess { throw new RejectedExecutionException("test under failure"); } }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); } @Override @@ -1111,10 +1112,12 @@ public class TestAsyncProcess { } static class AsyncProcessForThrowableCheck extends AsyncProcess { + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, ExecutorService pool) { super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf)); + conf), rpcTimeout); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 256c3740505..ce18ef51264 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -815,9 +815,22 @@ public final class HConstants { /** * timeout for each RPC + * @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY} + * instead. */ + @Deprecated public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; + /** + * timeout for each read RPC + */ + public static final String HBASE_RPC_READ_TIMEOUT_KEY = "hbase.rpc.read.timeout"; + + /** + * timeout for each write RPC + */ + public static final String HBASE_RPC_WRITE_TIMEOUT_KEY = "hbase.rpc.write.timeout"; + /** * Default value of {@link #HBASE_RPC_TIMEOUT_KEY} */ diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index b9e393e8a01..33c2fc2ca9f 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -860,12 +860,34 @@ public class RemoteHTable implements Table { } @Override + @Deprecated public void setRpcTimeout(int rpcTimeout) { throw new UnsupportedOperationException(); } @Override + @Deprecated public int getRpcTimeout() { throw new UnsupportedOperationException(); } + + @Override + public int getReadRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { + throw new UnsupportedOperationException(); + } + + @Override + public int getWriteRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { + throw new UnsupportedOperationException(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index 5da0df76a90..6a732612c15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -308,12 +308,26 @@ public final class HTableWrapper implements Table { } @Override + @Deprecated public void setRpcTimeout(int rpcTimeout) { table.setRpcTimeout(rpcTimeout); } @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { table.setWriteRpcTimeout(writeRpcTimeout); } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { table.setReadRpcTimeout(readRpcTimeout); } + + @Override + @Deprecated public int getRpcTimeout() { return table.getRpcTimeout(); } + + @Override + public int getWriteRpcTimeout() { return table.getWriteRpcTimeout(); } + + @Override + public int getReadRpcTimeout() { return table.getReadRpcTimeout(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 265e3c1c352..036b38fbf93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; @@ -126,7 +127,8 @@ public class HConnectionTestingUtility { Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getAsyncProcess()).thenReturn( new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, - RpcControllerFactory.instantiate(conf))); + RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT))); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); @@ -194,4 +196,4 @@ public class HConnectionTestingUtility { return result; } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 1b20b766224..4d47bdea4c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,11 +18,7 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.IOException; import java.lang.reflect.Field; @@ -149,6 +145,16 @@ public class TestHCM { } } + public static class SleepWriteCoprocessor extends BaseRegionObserver { + public static final int SLEEP_TIME = 5000; + @Override + public Result preIncrement(final ObserverContext e, + final Increment increment) throws IOException { + Threads.sleep(SLEEP_TIME); + return super.preIncrement(e, increment); + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); @@ -351,7 +357,7 @@ public class TestHCM { } } - @Test(expected = RetriesExhaustedException.class) + @Test public void testRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); hdt.addCoprocessor(SleepCoprocessor.class.getName()); @@ -361,6 +367,78 @@ public class TestHCM { t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + // Again, with configuration based override + c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); + try (Connection conn = ConnectionFactory.createConnection(c)) { + try (Table t = conn.getTable(hdt.getTableName())) { + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + } + } + + @Test + public void testWriteRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout"); + hdt.addCoprocessor(SleepWriteCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100); + Increment i = new Increment(FAM_NAM); + i.addColumn(FAM_NAM, FAM_NAM, 1); + t.increment(i); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + // Again, with configuration based override + c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2); + try (Connection conn = ConnectionFactory.createConnection(c)) { + try (Table t = conn.getTable(hdt.getTableName())) { + Increment i = new Increment(FAM_NAM); + i.addColumn(FAM_NAM, FAM_NAM, 1); + t.increment(i); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + } + } + + @Test + public void testReadRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setReadRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + // Again, with configuration based override + c.setInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); + try (Connection conn = ConnectionFactory.createConnection(c)) { + try (Table t = conn.getTable(hdt.getTableName())) { + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java index 770c39b82d9..d2e78b77dcf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java @@ -333,12 +333,26 @@ public class RegionAsTable implements Table { } @Override + @Deprecated public void setRpcTimeout(int rpcTimeout) { throw new UnsupportedOperationException(); } @Override + public void setWriteRpcTimeout(int writeRpcTimeout) {throw new UnsupportedOperationException(); } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) {throw new UnsupportedOperationException(); } + + @Override + @Deprecated public int getRpcTimeout() { throw new UnsupportedOperationException(); } + + @Override + public int getWriteRpcTimeout() { throw new UnsupportedOperationException(); } + + @Override + public int getReadRpcTimeout() { throw new UnsupportedOperationException(); } } \ No newline at end of file