HBASE-16664 Timeout logic in AsyncProcess is broken

Signed-off-by: chenheng <chenheng@apache.org>
This commit is contained in:
Phil Yang 2016-10-09 19:31:45 +08:00 committed by chenheng
parent e2278f9544
commit 8f9fadf021
7 changed files with 259 additions and 71 deletions

View File

@ -259,7 +259,8 @@ class AsyncProcess {
protected final long pause;
protected int numTries;
protected int serverTrackerTimeout;
protected int timeout;
protected int rpcTimeout;
protected int operationTimeout;
protected long primaryCallTimeoutMicroseconds;
/** Whether to log details for batch errors */
private final boolean logBatchErrorDetails;
@ -322,7 +323,9 @@ class AsyncProcess {
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.timeout = rpcTimeout;
this.rpcTimeout = rpcTimeout;
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
@ -378,6 +381,14 @@ class AsyncProcess {
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
}
public void setRpcTimeout(int rpcTimeout) {
this.rpcTimeout = rpcTimeout;
}
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
}
/**
* @return pool if non null, otherwise returns this.pool if non null, otherwise throws
* RuntimeException
@ -570,12 +581,12 @@ class AsyncProcess {
*/
public <CResult> AsyncRequestFuture submitAll(TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
return submitAll(null, tableName, rows, callback, results, null, timeout);
return submitAll(null, tableName, rows, callback, results, null, operationTimeout, rpcTimeout);
}
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
return submitAll(pool, tableName, rows, callback, results, null, timeout);
return submitAll(pool, tableName, rows, callback, results, null, operationTimeout, rpcTimeout);
}
/**
* Submit immediately the list of rows, whatever the server status. Kept for backward
@ -589,7 +600,7 @@ class AsyncProcess {
*/
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
PayloadCarryingServerCallable callable, int curTimeout) {
PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
// The position will be used by the processBatch to match the object array returned.
@ -609,7 +620,7 @@ class AsyncProcess {
}
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
callable, curTimeout);
callable, operationTimeout, rpcTimeout);
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
@ -779,12 +790,12 @@ class AsyncProcess {
if (callable == null) {
callable = createCallable(server, tableName, multiAction);
}
RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
RpcRetryingCaller<MultiResponse> caller = createCaller(callable, rpcTimeout);
try {
if (callsInProgress != null) {
callsInProgress.add(callable);
}
res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
res = caller.callWithoutRetries(callable, operationTimeout);
if (res == null) {
// Cancelled
return;
@ -850,11 +861,15 @@ class AsyncProcess {
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
private PayloadCarryingServerCallable currentCallable;
private int currentCallTotalTimeout;
private int operationTimeout;
private int rpcTimeout;
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
private RetryingTimeTracker tracker;
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable, int timeout) {
Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable,
int operationTimeout, int rpcTimeout) {
this.pool = pool;
this.callback = callback;
this.nonceGroup = nonceGroup;
@ -924,7 +939,12 @@ class AsyncProcess {
this.errorsByServer = createServerErrorTracker();
this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
this.currentCallable = callable;
this.currentCallTotalTimeout = timeout;
this.operationTimeout = operationTimeout;
this.rpcTimeout = rpcTimeout;
if (callable == null) {
tracker = new RetryingTimeTracker();
tracker.start();
}
}
public Set<PayloadCarryingServerCallable> getCallsInProgress() {
@ -1759,6 +1779,16 @@ class AsyncProcess {
waitUntilDone();
return results;
}
/**
* Create a callable. Isolated to be easily overridden in the tests.
*/
@VisibleForTesting
protected MultiServerCallable<Row> createCallable(final ServerName server,
TableName tableName, final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(connection, tableName, server,
AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker);
}
}
@VisibleForTesting
@ -1781,10 +1811,10 @@ class AsyncProcess {
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults,
PayloadCarryingServerCallable callable, int curTimeout) {
PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) {
return new AsyncRequestFutureImpl<CResult>(
tableName, actions, nonceGroup, getPool(pool), needResults,
results, callback, callable, curTimeout);
results, callback, callable, operationTimeout, rpcTimeout);
}
@VisibleForTesting
@ -1793,24 +1823,17 @@ class AsyncProcess {
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
return createAsyncRequestFuture(
tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout);
}
/**
* Create a callable. Isolated to be easily overridden in the tests.
*/
@VisibleForTesting
protected MultiServerCallable<Row> createCallable(final ServerName server,
TableName tableName, final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
tableName, actions, nonceGroup, pool, callback, results, needResults, null,
operationTimeout, rpcTimeout);
}
/**
* Create a caller. Isolated to be easily overridden in the tests.
*/
@VisibleForTesting
protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable) {
return rpcCallerFactory.<MultiResponse> newCaller();
protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable,
int rpcTimeout) {
return rpcCallerFactory.<MultiResponse> newCaller(rpcTimeout);
}
@VisibleForTesting

View File

@ -81,6 +81,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
private boolean closed = false;
private final ExecutorService pool;
private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
private int operationTimeout;
@VisibleForTesting
protected AsyncProcess ap; // non-final so can be overridden in test
@ -106,7 +107,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.operationTimeout = conn.getConfiguration().getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
}
@ -281,6 +284,16 @@ public class BufferedMutatorImpl implements BufferedMutator {
return this.writeBufferSize;
}
public void setRpcTimeout(int writeRpcTimeout) {
this.writeRpcTimeout = writeRpcTimeout;
this.ap.setRpcTimeout(writeRpcTimeout);
}
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
this.ap.setOperationTimeout(operationTimeout);
}
/**
* This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beÓ
* called from production uses.

View File

@ -911,9 +911,10 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
public void batch(final List<? extends Row> actions, final Object[] results, int timeout)
public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
throws InterruptedException, IOException {
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout);
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null,
operationTimeout, rpcTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@ -1055,13 +1056,12 @@ public class HTable implements HTableInterface, RegionLocator {
*/
@Override
public void mutateRow(final RowMutations rm) throws IOException {
final RetryingTimeTracker tracker = new RetryingTimeTracker();
final RetryingTimeTracker tracker = new RetryingTimeTracker().start();
PayloadCarryingServerCallable<MultiResponse> callable =
new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory) {
@Override
public MultiResponse call(int callTimeout) throws IOException {
tracker.start();
controller.setPriority(tableName);
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
@ -1091,7 +1091,7 @@ public class HTable implements HTableInterface, RegionLocator {
}
};
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
null, null, callable, operationTimeout);
null, null, callable, operationTimeout, writeRpcTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@ -1364,13 +1364,12 @@ public class HTable implements HTableInterface, RegionLocator {
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final RowMutations rm)
throws IOException {
final RetryingTimeTracker tracker = new RetryingTimeTracker();
final RetryingTimeTracker tracker = new RetryingTimeTracker().start();
PayloadCarryingServerCallable<MultiResponse> callable =
new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory) {
@Override
public MultiResponse call(int callTimeout) throws IOException {
tracker.start();
controller.setPriority(tableName);
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
@ -1404,7 +1403,7 @@ public class HTable implements HTableInterface, RegionLocator {
* */
Object[] results = new Object[rm.getMutations().size()];
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
null, results, callable, operationTimeout);
null, results, callable, operationTimeout, writeRpcTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@ -1809,6 +1808,10 @@ public class HTable implements HTableInterface, RegionLocator {
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
if (mutator != null) {
mutator.setOperationTimeout(operationTimeout);
}
multiAp.setOperationTimeout(operationTimeout);
}
public int getOperationTimeout() {
@ -1824,8 +1827,8 @@ public class HTable implements HTableInterface, RegionLocator {
@Override
@Deprecated
public void setRpcTimeout(int rpcTimeout) {
this.readRpcTimeout = rpcTimeout;
this.writeRpcTimeout = rpcTimeout;
setWriteRpcTimeout(rpcTimeout);
setReadRpcTimeout(rpcTimeout);
}
@Override
@ -1836,6 +1839,10 @@ public class HTable implements HTableInterface, RegionLocator {
@Override
public void setWriteRpcTimeout(int writeRpcTimeout) {
this.writeRpcTimeout = writeRpcTimeout;
if (mutator != null) {
mutator.setRpcTimeout(writeRpcTimeout);
}
multiAp.setRpcTimeout(writeRpcTimeout);
}
@Override
@ -1973,6 +1980,8 @@ public class HTable implements HTableInterface, RegionLocator {
.writeBufferSize(connConfiguration.getWriteBufferSize())
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
);
mutator.setRpcTimeout(writeRpcTimeout);
mutator.setOperationTimeout(operationTimeout);
}
return mutator;
}

View File

@ -52,9 +52,12 @@ import com.google.protobuf.ServiceException;
class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> {
private final MultiAction<R> multiAction;
private final boolean cellBlock;
private final RetryingTimeTracker tracker;
private final int rpcTimeout;
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi,
int rpcTimeout, RetryingTimeTracker tracker) {
super(connection, tableName, null, rpcFactory);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
@ -62,6 +65,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
// we will store the server here, and throw if someone tries to obtain location/regioninfo.
this.location = new HRegionLocation(null, location);
this.cellBlock = isCellBlock();
this.tracker = tracker;
this.rpcTimeout = rpcTimeout;
}
@Override
@ -79,7 +84,13 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
}
@Override
public MultiResponse call(int callTimeout) throws IOException {
public MultiResponse call(int operationTimeout) throws IOException {
int remainingTime = tracker.getRemainingTime(operationTimeout);
if (remainingTime <= 1) {
// "1" is a special return value in RetryingTimeTracker, see its implementation.
throw new DoNotRetryIOException("Operation Timeout");
}
int callTimeout = Math.min(rpcTimeout, remainingTime);
int countOfActions = this.multiAction.size();
if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();

View File

@ -25,10 +25,11 @@ class RetryingTimeTracker {
private long globalStartTime = -1;
public void start() {
public RetryingTimeTracker start() {
if (this.globalStartTime < 0) {
this.globalStartTime = EnvironmentEdgeManager.currentTime();
}
return this;
}
public int getRemainingTime(int callTimeout) {

View File

@ -211,9 +211,10 @@ public class TestAsyncProcess {
@Override
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
PayloadCarryingServerCallable callable, int curTimeout) {
previousTimeout = curTimeout;
return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout);
PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) {
previousTimeout = rpcTimeout;
return super.submitAll(pool, tableName, rows, callback, results, callable, operationTimeout,
rpcTimeout);
}
@Override
@ -222,7 +223,7 @@ public class TestAsyncProcess {
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
PayloadCarryingServerCallable callable) {
PayloadCarryingServerCallable callable, int rpcTimeout) {
callsCt.incrementAndGet();
MultiServerCallable callable1 = (MultiServerCallable) callable;
final MultiResponse mr = createMultiResponse(
@ -285,7 +286,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
PayloadCarryingServerCallable callable) {
PayloadCarryingServerCallable callable, int rpcTimeout) {
callsCt.incrementAndGet();
return new CallerWithFailure(ioe);
}
@ -336,7 +337,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
PayloadCarryingServerCallable payloadCallable) {
PayloadCarryingServerCallable payloadCallable, int rpcTimeout) {
MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
@ -147,6 +148,36 @@ public class TestHCM {
throw new IOException("first call I fail");
}
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(sleepTime.get());
if (ct.incrementAndGet() == 1) {
throw new IOException("first call I fail");
}
}
@Override
public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
final Delete delete,
final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(sleepTime.get());
if (ct.incrementAndGet() == 1) {
throw new IOException("first call I fail");
}
}
@Override
public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment) throws IOException {
Threads.sleep(sleepTime.get());
if (ct.incrementAndGet() == 1) {
throw new IOException("first call I fail");
}
return super.preIncrement(e, increment);
}
}
public static class SleepCoprocessor extends BaseRegionObserver {
@ -162,16 +193,20 @@ public class TestHCM {
final Put put, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(SLEEP_TIME);
}
}
public static class SleepWriteCoprocessor extends BaseRegionObserver {
public static final int SLEEP_TIME = 5000;
@Override
public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment) throws IOException {
Threads.sleep(SLEEP_TIME);
return super.preIncrement(e, increment);
}
@Override
public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete,
final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(SLEEP_TIME);
}
}
public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver {
@ -365,11 +400,12 @@ public class TestHCM {
* timeouted when the server answers.
*/
@Test
public void testOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
public void testGetOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout");
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
table.setRpcTimeout(Integer.MAX_VALUE);
SleepAndFailFirstTime.ct.set(0);
// Check that it works if the timeout is big enough
table.setOperationTimeout(120 * 1000);
table.get(new Get(FAM_NAM));
@ -392,6 +428,62 @@ public class TestHCM {
}
}
@Test
public void testPutOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout");
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration());
table.setRpcTimeout(Integer.MAX_VALUE);
SleepAndFailFirstTime.ct.set(0);
// Check that it works if the timeout is big enough
table.setOperationTimeout(120 * 1000);
table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
// Resetting and retrying. Will fail this time, not enough time for the second try
SleepAndFailFirstTime.ct.set(0);
try {
table.setOperationTimeout(30 * 1000);
table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
Assert.fail("We expect an exception here");
} catch (RetriesExhaustedWithDetailsException e) {
// The client has a CallTimeout class, but it's not shared.We're not very clean today,
// in the general case you can expect the call to stop, but the exception may vary.
// In this test however, we're sure that it will be a socket timeout.
LOG.info("We received an exception, as expected ", e);
} catch (IOException e) {
Assert.fail("Wrong exception:" + e.getMessage());
} finally {
table.close();
}
}
@Test
public void testDeleteOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout");
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration());
table.setRpcTimeout(Integer.MAX_VALUE);
SleepAndFailFirstTime.ct.set(0);
// Check that it works if the timeout is big enough
table.setOperationTimeout(120 * 1000);
table.delete(new Delete(FAM_NAM));
// Resetting and retrying. Will fail this time, not enough time for the second try
SleepAndFailFirstTime.ct.set(0);
try {
table.setOperationTimeout(30 * 1000);
table.delete(new Delete(FAM_NAM));
Assert.fail("We expect an exception here");
} catch (IOException e) {
// The client has a CallTimeout class, but it's not shared.We're not very clean today,
// in the general case you can expect the call to stop, but the exception may vary.
// In this test however, we're sure that it will be a socket timeout.
LOG.info("We received an exception, as expected ", e);
} finally {
table.close();
}
}
@Test
public void testRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
@ -420,14 +512,14 @@ public class TestHCM {
}
@Test
public void testWriteRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout");
hdt.addCoprocessor(SleepWriteCoprocessor.class.getName());
public void testIncrementRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testIncrementRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100);
t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
Increment i = new Increment(FAM_NAM);
i.addColumn(FAM_NAM, FAM_NAM, 1);
t.increment(i);
@ -437,7 +529,7 @@ public class TestHCM {
}
// Again, with configuration based override
c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2);
c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
try (Connection conn = ConnectionFactory.createConnection(c)) {
try (Table t = conn.getTable(hdt.getTableName())) {
Increment i = new Increment(FAM_NAM);
@ -451,8 +543,46 @@ public class TestHCM {
}
@Test
public void testReadRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout");
public void testDeleteRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
Delete d = new Delete(FAM_NAM);
d.addColumn(FAM_NAM, FAM_NAM, 1);
t.delete(d);
fail("Write should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
}
@Test
public void testPutRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
Put p = new Put(FAM_NAM);
p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
t.put(p);
fail("Write should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
}
@Test
public void testGetRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
@ -503,6 +633,7 @@ public class TestHCM {
TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
SleepAndFailFirstTime.ct.set(0);
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
@ -1009,8 +1140,7 @@ public class TestHCM {
curServer.getServerName().getPort(),
conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
table.close();
}