From 8f9fadf0216977996564ec56347a91e5a0a8b945 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Sun, 9 Oct 2016 19:31:45 +0800 Subject: [PATCH] HBASE-16664 Timeout logic in AsyncProcess is broken Signed-off-by: chenheng --- .../hadoop/hbase/client/AsyncProcess.java | 73 ++++--- .../hbase/client/BufferedMutatorImpl.java | 15 +- .../apache/hadoop/hbase/client/HTable.java | 29 ++- .../hbase/client/MultiServerCallable.java | 15 +- .../hbase/client/RetryingTimeTracker.java | 3 +- .../hadoop/hbase/client/TestAsyncProcess.java | 13 +- .../apache/hadoop/hbase/client/TestHCM.java | 182 +++++++++++++++--- 7 files changed, 259 insertions(+), 71 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 647a466b155..b0652a7ad7b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -259,7 +259,8 @@ class AsyncProcess { protected final long pause; protected int numTries; protected int serverTrackerTimeout; - protected int timeout; + protected int rpcTimeout; + protected int operationTimeout; protected long primaryCallTimeoutMicroseconds; /** Whether to log details for batch errors */ private final boolean logBatchErrorDetails; @@ -322,7 +323,9 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.timeout = rpcTimeout; + this.rpcTimeout = rpcTimeout; + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, @@ -378,6 +381,14 @@ class AsyncProcess { DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); } + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + } + + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + } + /** * @return pool if non null, otherwise returns this.pool if non null, otherwise throws * RuntimeException @@ -570,12 +581,12 @@ class AsyncProcess { */ public AsyncRequestFuture submitAll(TableName tableName, List rows, Batch.Callback callback, Object[] results) { - return submitAll(null, tableName, rows, callback, results, null, timeout); + return submitAll(null, tableName, rows, callback, results, null, operationTimeout, rpcTimeout); } public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results) { - return submitAll(pool, tableName, rows, callback, results, null, timeout); + return submitAll(pool, tableName, rows, callback, results, null, operationTimeout, rpcTimeout); } /** * Submit immediately the list of rows, whatever the server status. Kept for backward @@ -589,7 +600,7 @@ class AsyncProcess { */ public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results, - PayloadCarryingServerCallable callable, int curTimeout) { + PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { List> actions = new ArrayList>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -609,7 +620,7 @@ class AsyncProcess { } AsyncRequestFutureImpl ars = createAsyncRequestFuture( tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, - callable, curTimeout); + callable, operationTimeout, rpcTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -779,12 +790,12 @@ class AsyncProcess { if (callable == null) { callable = createCallable(server, tableName, multiAction); } - RpcRetryingCaller caller = createCaller(callable); + RpcRetryingCaller caller = createCaller(callable, rpcTimeout); try { if (callsInProgress != null) { callsInProgress.add(callable); } - res = caller.callWithoutRetries(callable, currentCallTotalTimeout); + res = caller.callWithoutRetries(callable, operationTimeout); if (res == null) { // Cancelled return; @@ -850,11 +861,15 @@ class AsyncProcess { private final boolean hasAnyReplicaGets; private final long nonceGroup; private PayloadCarryingServerCallable currentCallable; - private int currentCallTotalTimeout; + private int operationTimeout; + private int rpcTimeout; private final Map> heapSizesByServer = new HashMap<>(); + private RetryingTimeTracker tracker; + public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback, PayloadCarryingServerCallable callable, int timeout) { + Batch.Callback callback, PayloadCarryingServerCallable callable, + int operationTimeout, int rpcTimeout) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -924,7 +939,12 @@ class AsyncProcess { this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); this.currentCallable = callable; - this.currentCallTotalTimeout = timeout; + this.operationTimeout = operationTimeout; + this.rpcTimeout = rpcTimeout; + if (callable == null) { + tracker = new RetryingTimeTracker(); + tracker.start(); + } } public Set getCallsInProgress() { @@ -1759,6 +1779,16 @@ class AsyncProcess { waitUntilDone(); return results; } + + /** + * Create a callable. Isolated to be easily overridden in the tests. + */ + @VisibleForTesting + protected MultiServerCallable createCallable(final ServerName server, + TableName tableName, final MultiAction multi) { + return new MultiServerCallable(connection, tableName, server, + AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker); + } } @VisibleForTesting @@ -1781,10 +1811,10 @@ class AsyncProcess { protected AsyncRequestFutureImpl createAsyncRequestFuture( TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults, - PayloadCarryingServerCallable callable, int curTimeout) { + PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { return new AsyncRequestFutureImpl( tableName, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout); + results, callback, callable, operationTimeout, rpcTimeout); } @VisibleForTesting @@ -1793,24 +1823,17 @@ class AsyncProcess { TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults) { return createAsyncRequestFuture( - tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout); - } - - /** - * Create a callable. Isolated to be easily overridden in the tests. - */ - @VisibleForTesting - protected MultiServerCallable createCallable(final ServerName server, - TableName tableName, final MultiAction multi) { - return new MultiServerCallable(connection, tableName, server, this.rpcFactory, multi); + tableName, actions, nonceGroup, pool, callback, results, needResults, null, + operationTimeout, rpcTimeout); } /** * Create a caller. Isolated to be easily overridden in the tests. */ @VisibleForTesting - protected RpcRetryingCaller createCaller(PayloadCarryingServerCallable callable) { - return rpcCallerFactory. newCaller(); + protected RpcRetryingCaller createCaller(PayloadCarryingServerCallable callable, + int rpcTimeout) { + return rpcCallerFactory. newCaller(rpcTimeout); } @VisibleForTesting diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index e12b34d7637..1974be3974f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -81,6 +81,7 @@ public class BufferedMutatorImpl implements BufferedMutator { private boolean closed = false; private final ExecutorService pool; private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor + private int operationTimeout; @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test @@ -106,7 +107,9 @@ public class BufferedMutatorImpl implements BufferedMutator { this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - + this.operationTimeout = conn.getConfiguration().getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); // puts need to track errors globally due to how the APIs currently work. ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout); } @@ -281,6 +284,16 @@ public class BufferedMutatorImpl implements BufferedMutator { return this.writeBufferSize; } + public void setRpcTimeout(int writeRpcTimeout) { + this.writeRpcTimeout = writeRpcTimeout; + this.ap.setRpcTimeout(writeRpcTimeout); + } + + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + this.ap.setOperationTimeout(operationTimeout); + } + /** * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beÓ * called from production uses. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 5fc2d65fc7c..e8a969f66f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -911,9 +911,10 @@ public class HTable implements HTableInterface, RegionLocator { } } - public void batch(final List actions, final Object[] results, int timeout) + public void batch(final List actions, final Object[] results, int rpcTimeout) throws InterruptedException, IOException { - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout); + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, + operationTimeout, rpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1055,13 +1056,12 @@ public class HTable implements HTableInterface, RegionLocator { */ @Override public void mutateRow(final RowMutations rm) throws IOException { - final RetryingTimeTracker tracker = new RetryingTimeTracker(); + final RetryingTimeTracker tracker = new RetryingTimeTracker().start(); PayloadCarryingServerCallable callable = new PayloadCarryingServerCallable(connection, getName(), rm.getRow(), rpcControllerFactory) { @Override public MultiResponse call(int callTimeout) throws IOException { - tracker.start(); controller.setPriority(tableName); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { @@ -1091,7 +1091,7 @@ public class HTable implements HTableInterface, RegionLocator { } }; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, null, callable, operationTimeout); + null, null, callable, operationTimeout, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1364,13 +1364,12 @@ public class HTable implements HTableInterface, RegionLocator { public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { - final RetryingTimeTracker tracker = new RetryingTimeTracker(); + final RetryingTimeTracker tracker = new RetryingTimeTracker().start(); PayloadCarryingServerCallable callable = new PayloadCarryingServerCallable(connection, getName(), rm.getRow(), rpcControllerFactory) { @Override public MultiResponse call(int callTimeout) throws IOException { - tracker.start(); controller.setPriority(tableName); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { @@ -1404,7 +1403,7 @@ public class HTable implements HTableInterface, RegionLocator { * */ Object[] results = new Object[rm.getMutations().size()]; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, results, callable, operationTimeout); + null, results, callable, operationTimeout, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1809,6 +1808,10 @@ public class HTable implements HTableInterface, RegionLocator { public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; + if (mutator != null) { + mutator.setOperationTimeout(operationTimeout); + } + multiAp.setOperationTimeout(operationTimeout); } public int getOperationTimeout() { @@ -1824,8 +1827,8 @@ public class HTable implements HTableInterface, RegionLocator { @Override @Deprecated public void setRpcTimeout(int rpcTimeout) { - this.readRpcTimeout = rpcTimeout; - this.writeRpcTimeout = rpcTimeout; + setWriteRpcTimeout(rpcTimeout); + setReadRpcTimeout(rpcTimeout); } @Override @@ -1836,6 +1839,10 @@ public class HTable implements HTableInterface, RegionLocator { @Override public void setWriteRpcTimeout(int writeRpcTimeout) { this.writeRpcTimeout = writeRpcTimeout; + if (mutator != null) { + mutator.setRpcTimeout(writeRpcTimeout); + } + multiAp.setRpcTimeout(writeRpcTimeout); } @Override @@ -1973,6 +1980,8 @@ public class HTable implements HTableInterface, RegionLocator { .writeBufferSize(connConfiguration.getWriteBufferSize()) .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) ); + mutator.setRpcTimeout(writeRpcTimeout); + mutator.setOperationTimeout(operationTimeout); } return mutator; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index d0b4c814474..115ba33567b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -52,9 +52,12 @@ import com.google.protobuf.ServiceException; class MultiServerCallable extends PayloadCarryingServerCallable { private final MultiAction multiAction; private final boolean cellBlock; + private final RetryingTimeTracker tracker; + private final int rpcTimeout; MultiServerCallable(final ClusterConnection connection, final TableName tableName, - final ServerName location, RpcControllerFactory rpcFactory, final MultiAction multi) { + final ServerName location, RpcControllerFactory rpcFactory, final MultiAction multi, + int rpcTimeout, RetryingTimeTracker tracker) { super(connection, tableName, null, rpcFactory); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. @@ -62,6 +65,8 @@ class MultiServerCallable extends PayloadCarryingServerCallable extends PayloadCarryingServerCallable AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results, - PayloadCarryingServerCallable callable, int curTimeout) { - previousTimeout = curTimeout; - return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout); + PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { + previousTimeout = rpcTimeout; + return super.submitAll(pool, tableName, rows, callback, results, callable, operationTimeout, + rpcTimeout); } @Override @@ -222,7 +223,7 @@ public class TestAsyncProcess { } @Override protected RpcRetryingCaller createCaller( - PayloadCarryingServerCallable callable) { + PayloadCarryingServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); MultiServerCallable callable1 = (MultiServerCallable) callable; final MultiResponse mr = createMultiResponse( @@ -285,7 +286,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller( - PayloadCarryingServerCallable callable) { + PayloadCarryingServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); return new CallerWithFailure(ioe); } @@ -336,7 +337,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller( - PayloadCarryingServerCallable payloadCallable) { + PayloadCarryingServerCallable payloadCallable, int rpcTimeout) { MultiServerCallable callable = (MultiServerCallable) payloadCallable; final MultiResponse mr = createMultiResponse( callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 8436563168d..f468c16d588 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.ServerTooBusyException; @@ -123,14 +124,14 @@ public class TestHCM { * This copro sleeps 20 second. The first call it fails. The second time, it works. */ public static class SleepAndFailFirstTime extends BaseRegionObserver { - static final AtomicLong ct = new AtomicLong(0); - static final String SLEEP_TIME_CONF_KEY = - "hbase.coprocessor.SleepAndFailFirstTime.sleepTime"; - static final long DEFAULT_SLEEP_TIME = 20000; - static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME); + static final AtomicLong ct = new AtomicLong(0); + static final String SLEEP_TIME_CONF_KEY = + "hbase.coprocessor.SleepAndFailFirstTime.sleepTime"; + static final long DEFAULT_SLEEP_TIME = 20000; + static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME); - public SleepAndFailFirstTime() { - } + public SleepAndFailFirstTime() { + } @Override public void postOpen(ObserverContext c) { @@ -141,12 +142,42 @@ public class TestHCM { @Override public void preGetOp(final ObserverContext e, - final Get get, final List results) throws IOException { + final Get get, final List results) throws IOException { Threads.sleep(sleepTime.get()); - if (ct.incrementAndGet() == 1){ + if (ct.incrementAndGet() == 1) { throw new IOException("first call I fail"); } } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public void preDelete(final ObserverContext e, + final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public Result preIncrement(final ObserverContext e, + final Increment increment) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + return super.preIncrement(e, increment); + } + } public static class SleepCoprocessor extends BaseRegionObserver { @@ -162,16 +193,20 @@ public class TestHCM { final Put put, final WALEdit edit, final Durability durability) throws IOException { Threads.sleep(SLEEP_TIME); } - } - public static class SleepWriteCoprocessor extends BaseRegionObserver { - public static final int SLEEP_TIME = 5000; @Override public Result preIncrement(final ObserverContext e, - final Increment increment) throws IOException { + final Increment increment) throws IOException { Threads.sleep(SLEEP_TIME); return super.preIncrement(e, increment); } + + @Override + public void preDelete(final ObserverContext e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } + } public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver { @@ -365,11 +400,12 @@ public class TestHCM { * timeouted when the server answers. */ @Test - public void testOperationTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout"); + public void testGetOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout"); hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); - HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); + Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); // Check that it works if the timeout is big enough table.setOperationTimeout(120 * 1000); table.get(new Get(FAM_NAM)); @@ -392,6 +428,62 @@ public class TestHCM { } } + @Test + public void testPutOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (RetriesExhaustedWithDetailsException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } finally { + table.close(); + } + } + + @Test + public void testDeleteOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.delete(new Delete(FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.delete(new Delete(FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (IOException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } finally { + table.close(); + } + } + @Test public void testRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); @@ -420,14 +512,14 @@ public class TestHCM { } @Test - public void testWriteRpcTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout"); - hdt.addCoprocessor(SleepWriteCoprocessor.class.getName()); + public void testIncrementRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testIncrementRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { - t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2); - t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100); + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); Increment i = new Increment(FAM_NAM); i.addColumn(FAM_NAM, FAM_NAM, 1); t.increment(i); @@ -437,7 +529,7 @@ public class TestHCM { } // Again, with configuration based override - c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2); + c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); try (Connection conn = ConnectionFactory.createConnection(c)) { try (Table t = conn.getTable(hdt.getTableName())) { Increment i = new Increment(FAM_NAM); @@ -451,8 +543,46 @@ public class TestHCM { } @Test - public void testReadRpcTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout"); + public void testDeleteRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Delete d = new Delete(FAM_NAM); + d.addColumn(FAM_NAM, FAM_NAM, 1); + t.delete(d); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + } + + @Test + public void testPutRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Put p = new Put(FAM_NAM); + p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); + t.put(p); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + } + + @Test + public void testGetRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetRpcTimeout"); hdt.addCoprocessor(SleepCoprocessor.class.getName()); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); @@ -503,6 +633,7 @@ public class TestHCM { TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close(); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + SleepAndFailFirstTime.ct.set(0); c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000); c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000); @@ -1009,8 +1140,7 @@ public class TestHCM { curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); table.close(); }