HBASE-15866 Split hbase.rpc.timeout into *.read.timeout and *.write.timeout

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Amending-Author: Andrew Purtell <apurtell@apache.org>

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
This commit is contained in:
Vivek 2016-08-05 17:25:06 -07:00 committed by Andrew Purtell
parent 6b233c4332
commit ec99838b9c
13 changed files with 291 additions and 53 deletions

View File

@ -280,7 +280,8 @@ class AsyncProcess {
}
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory,
int rpcTimeout) {
if (hc == null) {
throw new IllegalArgumentException("HConnection cannot be null.");
}
@ -295,8 +296,7 @@ class AsyncProcess {
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.timeout = rpcTimeout;
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,

View File

@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -71,6 +72,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
private final int maxKeyValueSize;
private boolean closed = false;
private final ExecutorService pool;
private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
@VisibleForTesting
protected AsyncProcess ap; // non-final so can be overridden in test
@ -93,8 +95,12 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
}
@Override

View File

@ -560,6 +560,7 @@ class ConnectionManager {
private final boolean useMetaReplicas;
private final int numTries;
final int rpcTimeout;
final int writeRpcTimeout;
private NonceGenerator nonceGenerator = null;
private final AsyncProcess asyncProcess;
// single tracker per connection
@ -654,6 +655,9 @@ class ConnectionManager {
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.writeRpcTimeout = conf.getInt(
HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
synchronized (nonceGeneratorCreateLock) {
if (ConnectionManager.nonceGenerator == null) {
@ -2340,7 +2344,8 @@ class ConnectionManager {
// For tests to override.
protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available.
return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory);
return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory,
writeRpcTimeout);
}
@Override

View File

@ -125,7 +125,8 @@ public class HTable implements HTableInterface, RegionLocator {
protected long scannerMaxResultSize;
private ExecutorService pool; // For Multi & Scan
private int operationTimeout; // global timeout for each blocking method with retrying rpc
private int rpcTimeout; // timeout for each rpc request
private int readRpcTimeout; // timeout for each read rpc request
private int writeRpcTimeout; // timeout for each write rpc request
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG;
@ -356,8 +357,12 @@ public class HTable implements HTableInterface, RegionLocator {
}
this.operationTimeout = tableName.isSystemTable() ?
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
if (this.rpcCallerFactory == null) {
@ -573,7 +578,7 @@ public class HTable implements HTableInterface, RegionLocator {
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
rpcControllerFactory, operationTimeout, rpcTimeout);
rpcControllerFactory, operationTimeout, readRpcTimeout);
if (htd != null) {
return new UnmodifyableHTableDescriptor(htd);
}
@ -755,7 +760,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -861,7 +866,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -978,7 +983,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -1108,7 +1113,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -1140,7 +1145,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -1211,7 +1216,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -1241,7 +1246,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -1272,7 +1277,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -1302,7 +1307,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -1333,7 +1338,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
};
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@ -1806,12 +1811,35 @@ public class HTable implements HTableInterface, RegionLocator {
return operationTimeout;
}
@Override public void setRpcTimeout(int rpcTimeout) {
this.rpcTimeout = rpcTimeout;
@Override
@Deprecated
public int getRpcTimeout() {
return readRpcTimeout;
}
@Override public int getRpcTimeout() {
return rpcTimeout;
@Override
@Deprecated
public void setRpcTimeout(int rpcTimeout) {
this.readRpcTimeout = rpcTimeout;
this.writeRpcTimeout = rpcTimeout;
}
@Override
public int getWriteRpcTimeout() {
return writeRpcTimeout;
}
@Override
public void setWriteRpcTimeout(int writeRpcTimeout) {
this.writeRpcTimeout = writeRpcTimeout;
}
@Override
public int getReadRpcTimeout() { return readRpcTimeout; }
@Override
public void setReadRpcTimeout(int readRpcTimeout) {
this.readRpcTimeout = readRpcTimeout;
}
@Override
@ -1891,7 +1919,7 @@ public class HTable implements HTableInterface, RegionLocator {
AsyncProcess asyncProcess =
new AsyncProcess(connection, configuration, pool,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
true, RpcControllerFactory.instantiate(configuration));
true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
new Callback<ClientProtos.CoprocessorServiceResult>() {

View File

@ -441,6 +441,7 @@ public class HTableMultiplexer {
private final ScheduledExecutorService executor;
private final int maxRetryInQueue;
private final AtomicInteger retryInQueue = new AtomicInteger(0);
private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
@ -450,7 +451,10 @@ public class HTableMultiplexer {
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout);
this.executor = executor;
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
}

View File

@ -678,12 +678,36 @@ public class HTablePool implements Closeable {
return table.getOperationTimeout();
}
@Override public void setRpcTimeout(int rpcTimeout) {
@Override
@Deprecated
public void setRpcTimeout(int rpcTimeout) {
table.setRpcTimeout(rpcTimeout);
}
@Override public int getRpcTimeout() {
@Override
@Deprecated
public int getRpcTimeout() {
return table.getRpcTimeout();
}
@Override
public int getReadRpcTimeout() {
return table.getReadRpcTimeout();
}
@Override
public void setReadRpcTimeout(int readRpcTimeout) {
table.setReadRpcTimeout(readRpcTimeout);
}
@Override
public int getWriteRpcTimeout() {
return table.getWriteRpcTimeout();
}
@Override
public void setWriteRpcTimeout(int writeRpcTimeout) {
table.setWriteRpcTimeout(writeRpcTimeout);
}
}
}

View File

@ -613,17 +613,57 @@ public interface Table extends Closeable {
*/
public int getOperationTimeout();
/**
* Get timeout (millisecond) of each rpc request in this Table instance.
*
* @returns Currently configured read timeout
* @deprecated Use getReadRpcTimeout or getWriteRpcTimeout instead
*/
@Deprecated
int getRpcTimeout();
/**
* Set timeout (millisecond) of each rpc request in operations of this Table instance, will
* override the value of hbase.rpc.timeout in configuration.
* If a rpc request waiting too long, it will stop waiting and send a new request to retry until
* retries exhausted or operation timeout reached.
* <p>
* NOTE: This will set both the read and write timeout settings to the provided value.
*
* @param rpcTimeout the timeout of each rpc request in millisecond.
*
* @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead
*/
public void setRpcTimeout(int rpcTimeout);
@Deprecated
void setRpcTimeout(int rpcTimeout);
/**
* Get timeout (millisecond) of each rpc request in this Table instance.
* Get timeout (millisecond) of each rpc read request in this Table instance.
*/
public int getRpcTimeout();
int getReadRpcTimeout();
/**
* Set timeout (millisecond) of each rpc read request in operations of this Table instance, will
* override the value of hbase.rpc.read.timeout in configuration.
* If a rpc read request waiting too long, it will stop waiting and send a new request to retry
* until retries exhausted or operation timeout reached.
*
* @param readRpcTimeout
*/
void setReadRpcTimeout(int readRpcTimeout);
/**
* Get timeout (millisecond) of each rpc write request in this Table instance.
*/
int getWriteRpcTimeout();
/**
* Set timeout (millisecond) of each rpc write request in operations of this Table instance, will
* override the value of hbase.rpc.write.timeout in configuration.
* If a rpc write request waiting too long, it will stop waiting and send a new request to retry
* until retries exhausted or operation timeout reached.
*
* @param writeRpcTimeout
*/
void setWriteRpcTimeout(int writeRpcTimeout);
}

View File

@ -135,6 +135,7 @@ public class TestAsyncProcess {
final AtomicInteger nbActions = new AtomicInteger();
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
public AtomicInteger callsCt = new AtomicInteger();
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
@ -155,14 +156,14 @@ public class TestAsyncProcess {
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout);
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
@ -174,7 +175,7 @@ public class TestAsyncProcess {
throw new RejectedExecutionException("test under failure");
}
},
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
}
@Override
@ -1115,10 +1116,12 @@ public class TestAsyncProcess {
}
static class AsyncProcessForThrowableCheck extends AsyncProcess {
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
ExecutorService pool) {
super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
conf));
conf), rpcTimeout);
}
}

View File

@ -799,9 +799,22 @@ public final class HConstants {
/**
* timeout for each RPC
* @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY}
* instead.
*/
@Deprecated
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
/**
* timeout for each read RPC
*/
public static final String HBASE_RPC_READ_TIMEOUT_KEY = "hbase.rpc.read.timeout";
/**
* timeout for each write RPC
*/
public static final String HBASE_RPC_WRITE_TIMEOUT_KEY = "hbase.rpc.write.timeout";
/**
* Default value of {@link #HBASE_RPC_TIMEOUT_KEY}
*/

View File

@ -866,11 +866,35 @@ public class RemoteHTable implements Table {
throw new UnsupportedOperationException();
}
@Override public void setRpcTimeout(int rpcTimeout) {
@Override
@Deprecated
public void setRpcTimeout(int rpcTimeout) {
throw new UnsupportedOperationException();
}
@Override public int getRpcTimeout() {
@Override
@Deprecated
public int getRpcTimeout() {
throw new UnsupportedOperationException();
}
@Override
public int getReadRpcTimeout() {
throw new UnsupportedOperationException();
}
@Override
public void setReadRpcTimeout(int readRpcTimeout) {
throw new UnsupportedOperationException();
}
@Override
public int getWriteRpcTimeout() {
throw new UnsupportedOperationException();
}
@Override
public void setWriteRpcTimeout(int writeRpcTimeout) {
throw new UnsupportedOperationException();
}
}

View File

@ -374,11 +374,25 @@ public class HTableWrapper implements HTableInterface {
return table.getOperationTimeout();
}
@Override public void setRpcTimeout(int rpcTimeout) {
@Override
@Deprecated
public int getRpcTimeout() { return table.getRpcTimeout(); }
@Override
@Deprecated
public void setRpcTimeout(int rpcTimeout) {
table.setRpcTimeout(rpcTimeout);
}
@Override public int getRpcTimeout() {
return table.getRpcTimeout();
}
@Override
public void setWriteRpcTimeout(int writeRpcTimeout) { table.setWriteRpcTimeout(writeRpcTimeout); }
@Override
public void setReadRpcTimeout(int readRpcTimeout) { table.setReadRpcTimeout(readRpcTimeout); }
@Override
public int getWriteRpcTimeout() { return table.getWriteRpcTimeout(); }
@Override
public int getReadRpcTimeout() { return table.getReadRpcTimeout(); }
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
@ -154,7 +155,8 @@ public class HConnectionTestingUtility {
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
Mockito.when(c.getAsyncProcess()).thenReturn(
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
RpcControllerFactory.instantiate(conf)));
RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
Mockito.doNothing().when(c).incCount();
Mockito.doNothing().when(c).decCount();
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(

View File

@ -18,12 +18,7 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.reflect.Field;
@ -155,6 +150,16 @@ public class TestHCM {
}
}
public static class SleepWriteCoprocessor extends BaseRegionObserver {
public static final int SLEEP_TIME = 5000;
@Override
public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment) throws IOException {
Threads.sleep(SLEEP_TIME);
return super.preIncrement(e, increment);
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
@ -348,18 +353,88 @@ public class TestHCM {
}
}
@Test(expected = RetriesExhaustedException.class)
@Test
public void testRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
assert t instanceof HTable;
HTable table = (HTable) t;
table.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
table.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
table.get(new Get(FAM_NAM));
t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
t.get(new Get(FAM_NAM));
fail("Get should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
// Again, with configuration based override
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
try (Connection conn = ConnectionFactory.createConnection(c)) {
try (Table t = conn.getTable(hdt.getTableName())) {
t.get(new Get(FAM_NAM));
fail("Get should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
}
}
@Test
public void testWriteRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout");
hdt.addCoprocessor(SleepWriteCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100);
Increment i = new Increment(FAM_NAM);
i.addColumn(FAM_NAM, FAM_NAM, 1);
t.increment(i);
fail("Write should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
// Again, with configuration based override
c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2);
try (Connection conn = ConnectionFactory.createConnection(c)) {
try (Table t = conn.getTable(hdt.getTableName())) {
Increment i = new Increment(FAM_NAM);
i.addColumn(FAM_NAM, FAM_NAM, 1);
t.increment(i);
fail("Write should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
}
}
@Test
public void testReadRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
t.setReadRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
t.get(new Get(FAM_NAM));
fail("Get should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
// Again, with configuration based override
c.setInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
try (Connection conn = ConnectionFactory.createConnection(c)) {
try (Table t = conn.getTable(hdt.getTableName())) {
t.get(new Get(FAM_NAM));
fail("Get should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
}
}