diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f2d95460cb8..abefc4611c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.client; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.common.annotations.VisibleForTesting; - /** * This class allows a continuous flow of requests. It's written to be compatible with a * synchronous caller such as HTable. @@ -212,7 +212,8 @@ class AsyncProcess { protected final long pause; protected int numTries; protected int serverTrackerTimeout; - protected int timeout; + protected int rpcTimeout; + protected int operationTimeout; protected long primaryCallTimeoutMicroseconds; /** Whether to log details for batch errors */ protected final boolean logBatchErrorDetails; @@ -220,7 +221,7 @@ class AsyncProcess { public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, - RpcControllerFactory rpcFactory, int rpcTimeout) { + RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } @@ -236,7 +237,8 @@ class AsyncProcess { // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; - this.timeout = rpcTimeout; + this.rpcTimeout = rpcTimeout; + this.operationTimeout = operationTimeout; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, @@ -434,7 +436,7 @@ class AsyncProcess { List locationErrorRows, Map> actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, timeout); + tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { @@ -448,6 +450,14 @@ class AsyncProcess { return ars; } + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + } + + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + } + /** * Helper that is used when grouping the actions per region server. * @@ -473,7 +483,7 @@ class AsyncProcess { public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results) { - return submitAll(pool, tableName, rows, callback, results, null, timeout); + return submitAll(pool, tableName, rows, callback, results, null, -1); } /** * Submit immediately the list of rows, whatever the server status. Kept for backward @@ -484,10 +494,11 @@ class AsyncProcess { * @param rows the list of rows. * @param callback the callback. * @param results Optional array to return the results thru; backward compat. + * @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting. */ public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results, - CancellableRegionServerCallable callable, int curTimeout) { + CancellableRegionServerCallable callable, int rpcTimeout) { List> actions = new ArrayList>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -507,7 +518,7 @@ class AsyncProcess { } AsyncRequestFutureImpl ars = createAsyncRequestFuture( tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, - callable, curTimeout); + callable, rpcTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -520,10 +531,11 @@ class AsyncProcess { protected AsyncRequestFutureImpl createAsyncRequestFuture( TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults, - CancellableRegionServerCallable callable, int curTimeout) { + CancellableRegionServerCallable callable, int rpcTimeout) { return new AsyncRequestFutureImpl( tableName, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout, this); + results, callback, callable, operationTimeout, + rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this); } /** Wait until the async does not have more than max tasks in progress. */ @@ -664,8 +676,8 @@ class AsyncProcess { */ @VisibleForTesting protected RpcRetryingCaller createCaller( - CancellableRegionServerCallable callable) { - return rpcCallerFactory. newCaller(); + CancellableRegionServerCallable callable, int rpcTimeout) { + return rpcCallerFactory. newCaller(rpcTimeout); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 3894d58e5c0..a2642f3198d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -20,6 +20,24 @@ package org.apache.hadoop.hbase.client; import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -39,23 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Trace; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - /** * The context, and return value, for a single submit/submitAll call. * Note on how this class (one AP submit) works. Initially, all requests are split into groups @@ -70,6 +71,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { private static final Log LOG = LogFactory.getLog(AsyncRequestFutureImpl.class); + private RetryingTimeTracker tracker; + /** * Runnable (that can be submitted to thread pool) that waits for when it's time * to issue replica calls, finds region replicas, groups the requests by replica and @@ -219,12 +222,12 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { if (callable == null) { callable = createCallable(server, tableName, multiAction); } - RpcRetryingCaller caller = asyncProcess.createCaller(callable); + RpcRetryingCaller caller = asyncProcess.createCaller(callable,rpcTimeout); try { if (callsInProgress != null) { callsInProgress.add(callable); } - res = caller.callWithoutRetries(callable, currentCallTotalTimeout); + res = caller.callWithoutRetries(callable, operationTimeout); if (res == null) { // Cancelled return; @@ -297,7 +300,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { private final boolean hasAnyReplicaGets; private final long nonceGroup; private CancellableRegionServerCallable currentCallable; - private int currentCallTotalTimeout; + private int operationTimeout; + private int rpcTimeout; private final Map> heapSizesByServer = new HashMap<>(); protected AsyncProcess asyncProcess; @@ -337,10 +341,9 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback, - CancellableRegionServerCallable callable, int timeout, - AsyncProcess asyncProcess) { + ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback, + CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout, + AsyncProcess asyncProcess) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -410,9 +413,12 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { this.errorsByServer = createServerErrorTracker(); this.errors = (asyncProcess.globalErrors != null) ? asyncProcess.globalErrors : new BatchErrors(); + this.operationTimeout = operationTimeout; + this.rpcTimeout = rpcTimeout; this.currentCallable = callable; - this.currentCallTotalTimeout = timeout; - + if (callable == null) { + tracker = new RetryingTimeTracker().start(); + } } @VisibleForTesting @@ -1281,9 +1287,9 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { /** * Create a callable. Isolated to be easily overridden in the tests. */ - private MultiServerCallable createCallable(final ServerName server, - TableName tableName, final MultiAction multi) { + private MultiServerCallable createCallable(final ServerName server, TableName tableName, + final MultiAction multi) { return new MultiServerCallable(asyncProcess.connection, tableName, server, - multi, asyncProcess.rpcFactory.newController()); + multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker); } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index 5dc7fc3b415..fcc9af715e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -119,6 +119,16 @@ public interface BufferedMutator extends Closeable { */ long getWriteBufferSize(); + /** + * Set rpc timeout for this mutator instance + */ + void setRpcTimeout(int timeout); + + /** + * Set operation timeout for this mutator instance + */ + void setOperationTimeout(int timeout); + /** * Listens for asynchronous exceptions on a {@link BufferedMutator}. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 2d4c8b36c8c..f7eb09deade 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -82,6 +82,7 @@ public class BufferedMutatorImpl implements BufferedMutator { private boolean closed = false; private final ExecutorService pool; private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor + private int operationTimeout; @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test @@ -107,9 +108,12 @@ public class BufferedMutatorImpl implements BufferedMutator { this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - + this.operationTimeout = conn.getConfiguration().getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout); + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, + writeRpcTimeout, operationTimeout); } @Override @@ -282,6 +286,18 @@ public class BufferedMutatorImpl implements BufferedMutator { return this.writeBufferSize; } + @Override + public void setRpcTimeout(int timeout) { + this.writeRpcTimeout = timeout; + ap.setRpcTimeout(timeout); + } + + @Override + public void setOperationTimeout(int timeout) { + this.operationTimeout = timeout; + ap.setOperationTimeout(operationTimeout); + } + private class QueueRowAccess implements RowAccess { private int remainder = undealtMutationCount.getAndSet(0); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java index 69f5b556e7d..a0ff9005f48 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java @@ -30,15 +30,20 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use * AsyncProcess directly though this class. Also adds global timeout tracking on top of * RegionServerCallable and implements Cancellable. + * Global timeout tracking conflicts with logic in RpcRetryingCallerImpl's callWithRetries. So you + * can only use this callable in AsyncProcess which only uses callWithoutRetries and retries in its + * own implementation. */ @InterfaceAudience.Private abstract class CancellableRegionServerCallable extends ClientServiceCallable implements Cancellable { - private final RetryingTimeTracker tracker = new RetryingTimeTracker(); - + private final RetryingTimeTracker tracker; + private final int rpcTimeout; CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row, - RpcController rpcController) { + RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) { super(connection, tableName, row, rpcController); + this.rpcTimeout = rpcTimeout; + this.tracker = tracker; } /* Override so can mess with the callTimeout. @@ -46,7 +51,7 @@ abstract class CancellableRegionServerCallable extends ClientServiceCallable< * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int) */ @Override - public T call(int callTimeout) throws IOException { + public T call(int operationTimeout) throws IOException { if (isCancelled()) return null; if (Thread.interrupted()) { throw new InterruptedIOException(); @@ -54,11 +59,12 @@ abstract class CancellableRegionServerCallable extends ClientServiceCallable< // It is expected (it seems) that tracker.start can be called multiple times (on each trip // through the call when retrying). Also, we can call start and no need of a stop. this.tracker.start(); - int remainingTime = tracker.getRemainingTime(callTimeout); - if (remainingTime == 0) { - throw new DoNotRetryIOException("Timeout for mutate row"); + int remainingTime = tracker.getRemainingTime(operationTimeout); + if (remainingTime <= 1) { + // "1" is a special return value in RetryingTimeTracker, see its implementation. + throw new DoNotRetryIOException("Operation rpcTimeout"); } - return super.call(remainingTime); + return super.call(Math.min(rpcTimeout, remainingTime)); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 8db9dbf0e9b..9cf63dcb06a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1831,8 +1831,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout); + int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, + rpcTimeout, operationTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 84f80241307..2802a2c430e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -441,7 +441,7 @@ public class HTable implements Table { RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( rpcControllerFactory, tableName, this.connection, get, pool, connConfiguration.getRetriesNumber(), - operationTimeout, + operationTimeout, readRpcTimeout, connConfiguration.getPrimaryCallTimeoutMicroSecond()); return callable.call(operationTimeout); } @@ -479,15 +479,10 @@ public class HTable implements Table { batch(actions, results, -1); } - public void batch(final List actions, final Object[] results, int timeout) + public void batch(final List actions, final Object[] results, int rpcTimeout) throws InterruptedException, IOException { - AsyncRequestFuture ars = null; - if (timeout != -1) { - ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout); - } else { - // use default timeout in AP - ars = multiAp.submitAll(pool, tableName, actions, null, results); - } + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, + rpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -523,7 +518,8 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable( - connection, getName(), delete.getRow(), this.rpcControllerFactory.newController()) { + connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(), + writeRpcTimeout, new RetryingTimeTracker().start()) { @Override protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -535,7 +531,7 @@ public class HTable implements Table { List rows = new ArrayList(); rows.add(delete); AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, null, callable, operationTimeout); + null, null, callable, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -593,7 +589,7 @@ public class HTable implements Table { public void mutateRow(final RowMutations rm) throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable(this.connection, getName(), rm.getRow(), - rpcControllerFactory.newController()) { + rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){ @Override protected MultiResponse rpcCall() throws Exception { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( @@ -614,7 +610,7 @@ public class HTable implements Table { } }; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, null, callable, operationTimeout); + null, null, callable, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -798,7 +794,8 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable( - this.connection, getName(), row, this.rpcControllerFactory.newController()) { + this.connection, getName(), row, this.rpcControllerFactory.newController(), + writeRpcTimeout, new RetryingTimeTracker().start()) { @Override protected SingleResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -814,7 +811,7 @@ public class HTable implements Table { Object[] results = new Object[1]; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, results, callable, operationTimeout); + null, results, callable, -1); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -831,7 +828,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable(connection, getName(), rm.getRow(), - rpcControllerFactory.newController()) { + rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) { @Override protected MultiResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -858,7 +855,7 @@ public class HTable implements Table { * */ Object[] results = new Object[rm.getMutations().size()]; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, results, callable, operationTimeout); + null, results, callable, -1); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1117,6 +1114,10 @@ public class HTable implements Table { @Override public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; + if (mutator != null) { + mutator.setOperationTimeout(operationTimeout); + } + multiAp.setOperationTimeout(operationTimeout); } @Override @@ -1133,8 +1134,8 @@ public class HTable implements Table { @Override @Deprecated public void setRpcTimeout(int rpcTimeout) { - this.readRpcTimeout = rpcTimeout; - this.writeRpcTimeout = rpcTimeout; + setReadRpcTimeout(rpcTimeout); + setWriteRpcTimeout(rpcTimeout); } @Override @@ -1145,6 +1146,10 @@ public class HTable implements Table { @Override public void setWriteRpcTimeout(int writeRpcTimeout) { this.writeRpcTimeout = writeRpcTimeout; + if (mutator != null) { + mutator.setRpcTimeout(writeRpcTimeout); + } + multiAp.setRpcTimeout(writeRpcTimeout); } @Override @@ -1229,7 +1234,8 @@ public class HTable implements Table { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration), readRpcTimeout); + true, RpcControllerFactory.instantiate(configuration), readRpcTimeout, + operationTimeout); AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs, new Callback() { @@ -1281,6 +1287,8 @@ public class HTable implements Table { .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) ); } + mutator.setRpcTimeout(writeRpcTimeout); + mutator.setOperationTimeout(operationTimeout); return mutator; } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 2c1a61e0c40..e8379efb179 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -442,6 +442,7 @@ public class HTableMultiplexer { private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor + private final int operationTimeout; public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, @@ -454,7 +455,10 @@ public class HTableMultiplexer { this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout); + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, + writeRpcTimeout, operationTimeout); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 6067ef0cb05..7d50a277acf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -50,12 +50,13 @@ import com.google.common.annotations.VisibleForTesting; */ @InterfaceAudience.Private class MultiServerCallable extends CancellableRegionServerCallable { - private final MultiAction multiAction; - private final boolean cellBlock; + private MultiAction multiAction; + private boolean cellBlock; MultiServerCallable(final ClusterConnection connection, final TableName tableName, - final ServerName location, final MultiAction multi, RpcController rpcController) { - super(connection, tableName, null, rpcController); + final ServerName location, final MultiAction multi, RpcController rpcController, + int rpcTimeout, RetryingTimeTracker tracker) { + super(connection, tableName, null, rpcController, rpcTimeout, tracker); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so @@ -64,6 +65,12 @@ class MultiServerCallable extends CancellableRegionServerCallable multiAction) { + this.location = new HRegionLocation(null, location); + this.multiAction = multiAction; + this.cellBlock = isCellBlock(); + } + @Override protected HRegionLocation getLocation() { throw new RuntimeException("Cannot get region location for multi-region request"); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index aff0205feb1..52ed26374ea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; * @param the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class NoncedRegionServerCallable extends CancellableRegionServerCallable { +public abstract class NoncedRegionServerCallable extends ClientServiceCallable { private final long nonce; /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java index b9438e65893..e804e925460 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java @@ -24,10 +24,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; class RetryingTimeTracker { private long globalStartTime = -1; - public void start() { + public RetryingTimeTracker start() { if (this.globalStartTime < 0) { this.globalStartTime = EnvironmentEdgeManager.currentTime(); } + return this; } public int getRemainingTime(int callTimeout) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 04553d20ba2..a290c7811c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -58,7 +58,8 @@ public class RpcRetryingCallerWithReadReplicas { protected final Get get; protected final TableName tableName; protected final int timeBeforeReplicas; - private final int callTimeout; + private final int operationTimeout; + private final int rpcTimeout; private final int retries; private final RpcControllerFactory rpcControllerFactory; private final RpcRetryingCallerFactory rpcRetryingCallerFactory; @@ -66,7 +67,7 @@ public class RpcRetryingCallerWithReadReplicas { public RpcRetryingCallerWithReadReplicas( RpcControllerFactory rpcControllerFactory, TableName tableName, ClusterConnection cConnection, final Get get, - ExecutorService pool, int retries, int callTimeout, + ExecutorService pool, int retries, int operationTimeout, int rpcTimeout, int timeBeforeReplicas) { this.rpcControllerFactory = rpcControllerFactory; this.tableName = tableName; @@ -75,7 +76,8 @@ public class RpcRetryingCallerWithReadReplicas { this.get = get; this.pool = pool; this.retries = retries; - this.callTimeout = callTimeout; + this.operationTimeout = operationTimeout; + this.rpcTimeout = rpcTimeout; this.timeBeforeReplicas = timeBeforeReplicas; this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf); } @@ -91,7 +93,7 @@ public class RpcRetryingCallerWithReadReplicas { public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), - rpcControllerFactory.newController()); + rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker()); this.id = id; this.location = location; } @@ -133,7 +135,7 @@ public class RpcRetryingCallerWithReadReplicas { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); HBaseRpcController hrc = (HBaseRpcController)getRpcController(); hrc.reset(); - hrc.setCallTimeout(callTimeout); + hrc.setCallTimeout(rpcTimeout); hrc.setPriority(tableName); ClientProtos.GetResponse response = getStub().get(hrc, request); if (response == null) { @@ -258,7 +260,7 @@ public class RpcRetryingCallerWithReadReplicas { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); - cs.submit(callOnReplica, callTimeout, id); + cs.submit(callOnReplica, operationTimeout, id); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 0703e51cf64..ed521a31ca2 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -152,7 +152,10 @@ public class TestAsyncProcess { final AtomicInteger nbActions = new AtomicInteger(); public List allReqs = new ArrayList(); public AtomicInteger callsCt = new AtomicInteger(); - private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); private long previousTimeout = -1; @Override protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, @@ -162,7 +165,7 @@ public class TestAsyncProcess { // Test HTable has tableName of null, so pass DUMMY_TABLE AsyncRequestFutureImpl r = new MyAsyncRequestFutureImpl( DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout, this); + results, callback, callable, operationTimeout, rpcTimeout, this); allReqs.add(r); return r; } @@ -174,14 +177,16 @@ public class TestAsyncProcess { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout); + new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout, + operationTimeout); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), + rpcTimeout, operationTimeout); } public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, @@ -193,7 +198,8 @@ public class TestAsyncProcess { throw new RejectedExecutionException("test under failure"); } }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), + rpcTimeout, operationTimeout); } @Override @@ -213,7 +219,7 @@ public class TestAsyncProcess { } @Override protected RpcRetryingCaller createCaller( - CancellableRegionServerCallable callable) { + CancellableRegionServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); MultiServerCallable callable1 = (MultiServerCallable) callable; final MultiResponse mr = createMultiResponse( @@ -254,12 +260,11 @@ public class TestAsyncProcess { static class MyAsyncRequestFutureImpl extends AsyncRequestFutureImpl { public MyAsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback, - CancellableRegionServerCallable callable, int timeout, - AsyncProcess asyncProcess) { + ExecutorService pool, boolean needResults, Object[] results, + Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout, + int rpcTimeout, AsyncProcess asyncProcess) { super(tableName, actions, nonceGroup, pool, needResults, - results, callback, callable, timeout, asyncProcess); + results, callback, callable, operationTimeout, rpcTimeout, asyncProcess); } @Override @@ -299,7 +304,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller( - CancellableRegionServerCallable callable) { + CancellableRegionServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); return new CallerWithFailure(ioe); } @@ -351,7 +356,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller( - CancellableRegionServerCallable payloadCallable) { + CancellableRegionServerCallable payloadCallable, int rpcTimeout) { MultiServerCallable callable = (MultiServerCallable) payloadCallable; final MultiResponse mr = createMultiResponse( callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @@ -1638,12 +1643,14 @@ public class TestAsyncProcess { } static class AsyncProcessForThrowableCheck extends AsyncProcess { - private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, ExecutorService pool) { super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf), rpcTimeout); + conf), rpcTimeout, operationTimeout); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 658fa9608e6..ee896095633 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -128,7 +128,9 @@ public class HConnectionTestingUtility { Mockito.when(c.getAsyncProcess()).thenReturn( new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT))); + HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index f9ebc47d1c9..3416e54676d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -135,12 +135,42 @@ public class TestHCM { @Override public void preGetOp(final ObserverContext e, - final Get get, final List results) throws IOException { + final Get get, final List results) throws IOException { Threads.sleep(sleepTime.get()); - if (ct.incrementAndGet() == 1){ + if (ct.incrementAndGet() == 1) { throw new IOException("first call I fail"); } } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public void preDelete(final ObserverContext e, + final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public Result preIncrement(final ObserverContext e, + final Increment increment) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + return super.preIncrement(e, increment); + } + } public static class SleepCoprocessor extends BaseRegionObserver { @@ -156,16 +186,20 @@ public class TestHCM { final Put put, final WALEdit edit, final Durability durability) throws IOException { Threads.sleep(SLEEP_TIME); } - } - public static class SleepWriteCoprocessor extends BaseRegionObserver { - public static final int SLEEP_TIME = 5000; @Override public Result preIncrement(final ObserverContext e, final Increment increment) throws IOException { Threads.sleep(SLEEP_TIME); return super.preIncrement(e, increment); } + + @Override + public void preDelete(final ObserverContext e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } + } public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver { @@ -364,11 +398,12 @@ public class TestHCM { * timeouted when the server answers. */ @Test - public void testOperationTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout"); + public void testGetOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout"); hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); - Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}); + Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); // Check that it works if the timeout is big enough table.setOperationTimeout(120 * 1000); table.get(new Get(FAM_NAM)); @@ -391,6 +426,64 @@ public class TestHCM { } } + @Test + public void testPutOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (RetriesExhaustedWithDetailsException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } finally { + table.close(); + } + } + + @Test + public void testDeleteOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.delete(new Delete(FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.delete(new Delete(FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (RetriesExhaustedWithDetailsException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } finally { + table.close(); + } + } + @Test public void testRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); @@ -419,14 +512,14 @@ public class TestHCM { } @Test - public void testWriteRpcTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout"); - hdt.addCoprocessor(SleepWriteCoprocessor.class.getName()); + public void testIncrementRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testIncrementRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { - t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2); - t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100); + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); Increment i = new Increment(FAM_NAM); i.addColumn(FAM_NAM, FAM_NAM, 1); t.increment(i); @@ -436,7 +529,7 @@ public class TestHCM { } // Again, with configuration based override - c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2); + c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); try (Connection conn = ConnectionFactory.createConnection(c)) { try (Table t = conn.getTable(hdt.getTableName())) { Increment i = new Increment(FAM_NAM); @@ -450,8 +543,46 @@ public class TestHCM { } @Test - public void testReadRpcTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout"); + public void testDeleteRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Delete d = new Delete(FAM_NAM); + d.addColumn(FAM_NAM, FAM_NAM, 1); + t.delete(d); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + } + + @Test + public void testPutRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Put p = new Put(FAM_NAM); + p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); + t.put(p); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + } + + @Test + public void testGetRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetRpcTimeout"); hdt.addCoprocessor(SleepCoprocessor.class.getName()); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); @@ -502,6 +633,7 @@ public class TestHCM { TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close(); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + SleepAndFailFirstTime.ct.set(0); c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000); c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000); @@ -1013,8 +1145,7 @@ public class TestHCM { curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); table.close(); connection.close(); }