HBASE-16664 Timeout logic in AsyncProcess is broken

Signed-off-by: chenheng <chenheng@apache.org>
This commit is contained in:
Phil Yang 2016-10-09 15:25:11 +08:00 committed by chenheng
parent f11aa4542f
commit 88ff71b91b
15 changed files with 338 additions and 122 deletions

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.annotations.VisibleForTesting;
/**
* This class allows a continuous flow of requests. It's written to be compatible with a
* synchronous caller such as HTable.
@ -212,7 +212,8 @@ class AsyncProcess {
protected final long pause;
protected int numTries;
protected int serverTrackerTimeout;
protected int timeout;
protected int rpcTimeout;
protected int operationTimeout;
protected long primaryCallTimeoutMicroseconds;
/** Whether to log details for batch errors */
protected final boolean logBatchErrorDetails;
@ -220,7 +221,7 @@ class AsyncProcess {
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
RpcControllerFactory rpcFactory, int rpcTimeout) {
RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) {
if (hc == null) {
throw new IllegalArgumentException("ClusterConnection cannot be null.");
}
@ -236,7 +237,8 @@ class AsyncProcess {
// how many times we could try in total, one more than retry number
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
this.timeout = rpcTimeout;
this.rpcTimeout = rpcTimeout;
this.operationTimeout = operationTimeout;
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
@ -434,7 +436,7 @@ class AsyncProcess {
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, timeout);
tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1);
// Add location errors if any
if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) {
@ -448,6 +450,14 @@ class AsyncProcess {
return ars;
}
public void setRpcTimeout(int rpcTimeout) {
this.rpcTimeout = rpcTimeout;
}
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
}
/**
* Helper that is used when grouping the actions per region server.
*
@ -473,7 +483,7 @@ class AsyncProcess {
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
return submitAll(pool, tableName, rows, callback, results, null, timeout);
return submitAll(pool, tableName, rows, callback, results, null, -1);
}
/**
* Submit immediately the list of rows, whatever the server status. Kept for backward
@ -484,10 +494,11 @@ class AsyncProcess {
* @param rows the list of rows.
* @param callback the callback.
* @param results Optional array to return the results thru; backward compat.
* @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting.
*/
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
CancellableRegionServerCallable callable, int curTimeout) {
CancellableRegionServerCallable callable, int rpcTimeout) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
// The position will be used by the processBatch to match the object array returned.
@ -507,7 +518,7 @@ class AsyncProcess {
}
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
callable, curTimeout);
callable, rpcTimeout);
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
@ -520,10 +531,11 @@ class AsyncProcess {
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults,
CancellableRegionServerCallable callable, int curTimeout) {
CancellableRegionServerCallable callable, int rpcTimeout) {
return new AsyncRequestFutureImpl<CResult>(
tableName, actions, nonceGroup, getPool(pool), needResults,
results, callback, callable, curTimeout, this);
results, callback, callable, operationTimeout,
rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this);
}
/** Wait until the async does not have more than max tasks in progress. */
@ -664,8 +676,8 @@ class AsyncProcess {
*/
@VisibleForTesting
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable) {
return rpcCallerFactory.<AbstractResponse> newCaller();
CancellableRegionServerCallable callable, int rpcTimeout) {
return rpcCallerFactory.<AbstractResponse> newCaller(rpcTimeout);
}

View File

@ -20,6 +20,24 @@
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -39,23 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Trace;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* The context, and return value, for a single submit/submitAll call.
* Note on how this class (one AP submit) works. Initially, all requests are split into groups
@ -70,6 +71,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private static final Log LOG = LogFactory.getLog(AsyncRequestFutureImpl.class);
private RetryingTimeTracker tracker;
/**
* Runnable (that can be submitted to thread pool) that waits for when it's time
* to issue replica calls, finds region replicas, groups the requests by replica and
@ -219,12 +222,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (callable == null) {
callable = createCallable(server, tableName, multiAction);
}
RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable);
RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout);
try {
if (callsInProgress != null) {
callsInProgress.add(callable);
}
res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
res = caller.callWithoutRetries(callable, operationTimeout);
if (res == null) {
// Cancelled
return;
@ -297,7 +300,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
private CancellableRegionServerCallable currentCallable;
private int currentCallTotalTimeout;
private int operationTimeout;
private int rpcTimeout;
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
protected AsyncProcess asyncProcess;
@ -337,9 +341,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
Batch.Callback<CResult> callback,
CancellableRegionServerCallable callable, int timeout,
ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback,
CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout,
AsyncProcess asyncProcess) {
this.pool = pool;
this.callback = callback;
@ -410,9 +413,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
this.errorsByServer = createServerErrorTracker();
this.errors = (asyncProcess.globalErrors != null)
? asyncProcess.globalErrors : new BatchErrors();
this.operationTimeout = operationTimeout;
this.rpcTimeout = rpcTimeout;
this.currentCallable = callable;
this.currentCallTotalTimeout = timeout;
if (callable == null) {
tracker = new RetryingTimeTracker().start();
}
}
@VisibleForTesting
@ -1281,9 +1287,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
/**
* Create a callable. Isolated to be easily overridden in the tests.
*/
private MultiServerCallable<Row> createCallable(final ServerName server,
TableName tableName, final MultiAction<Row> multi) {
private MultiServerCallable<Row> createCallable(final ServerName server, TableName tableName,
final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(asyncProcess.connection, tableName, server,
multi, asyncProcess.rpcFactory.newController());
multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker);
}
}

View File

@ -119,6 +119,16 @@ public interface BufferedMutator extends Closeable {
*/
long getWriteBufferSize();
/**
* Set rpc timeout for this mutator instance
*/
void setRpcTimeout(int timeout);
/**
* Set operation timeout for this mutator instance
*/
void setOperationTimeout(int timeout);
/**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/

View File

@ -82,6 +82,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
private boolean closed = false;
private final ExecutorService pool;
private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
private int operationTimeout;
@VisibleForTesting
protected AsyncProcess ap; // non-final so can be overridden in test
@ -107,9 +108,12 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.operationTimeout = conn.getConfiguration().getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory,
writeRpcTimeout, operationTimeout);
}
@Override
@ -282,6 +286,18 @@ public class BufferedMutatorImpl implements BufferedMutator {
return this.writeBufferSize;
}
@Override
public void setRpcTimeout(int timeout) {
this.writeRpcTimeout = timeout;
ap.setRpcTimeout(timeout);
}
@Override
public void setOperationTimeout(int timeout) {
this.operationTimeout = timeout;
ap.setOperationTimeout(operationTimeout);
}
private class QueueRowAccess implements RowAccess<Row> {
private int remainder = undealtMutationCount.getAndSet(0);

View File

@ -30,15 +30,20 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
* This class is used to unify HTable calls with AsyncProcess Framework. HTable can use
* AsyncProcess directly though this class. Also adds global timeout tracking on top of
* RegionServerCallable and implements Cancellable.
* Global timeout tracking conflicts with logic in RpcRetryingCallerImpl's callWithRetries. So you
* can only use this callable in AsyncProcess which only uses callWithoutRetries and retries in its
* own implementation.
*/
@InterfaceAudience.Private
abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<T> implements
Cancellable {
private final RetryingTimeTracker tracker = new RetryingTimeTracker();
private final RetryingTimeTracker tracker;
private final int rpcTimeout;
CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
RpcController rpcController) {
RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) {
super(connection, tableName, row, rpcController);
this.rpcTimeout = rpcTimeout;
this.tracker = tracker;
}
/* Override so can mess with the callTimeout.
@ -46,7 +51,7 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
* @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
*/
@Override
public T call(int callTimeout) throws IOException {
public T call(int operationTimeout) throws IOException {
if (isCancelled()) return null;
if (Thread.interrupted()) {
throw new InterruptedIOException();
@ -54,11 +59,12 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
// It is expected (it seems) that tracker.start can be called multiple times (on each trip
// through the call when retrying). Also, we can call start and no need of a stop.
this.tracker.start();
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
throw new DoNotRetryIOException("Timeout for mutate row");
int remainingTime = tracker.getRemainingTime(operationTimeout);
if (remainingTime <= 1) {
// "1" is a special return value in RetryingTimeTracker, see its implementation.
throw new DoNotRetryIOException("Operation rpcTimeout");
}
return super.call(remainingTime);
return super.call(Math.min(rpcTimeout, remainingTime));
}
@Override

View File

@ -1831,8 +1831,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// For tests to override.
protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available.
int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout);
int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory,
rpcTimeout, operationTimeout);
}
@Override

View File

@ -441,7 +441,7 @@ public class HTable implements Table {
RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
rpcControllerFactory, tableName, this.connection, get, pool,
connConfiguration.getRetriesNumber(),
operationTimeout,
operationTimeout, readRpcTimeout,
connConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call(operationTimeout);
}
@ -479,15 +479,10 @@ public class HTable implements Table {
batch(actions, results, -1);
}
public void batch(final List<? extends Row> actions, final Object[] results, int timeout)
public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
throws InterruptedException, IOException {
AsyncRequestFuture ars = null;
if (timeout != -1) {
ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout);
} else {
// use default timeout in AP
ars = multiAp.submitAll(pool, tableName, actions, null, results);
}
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null,
rpcTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@ -523,7 +518,8 @@ public class HTable implements Table {
throws IOException {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
connection, getName(), delete.getRow(), this.rpcControllerFactory.newController()) {
connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(),
writeRpcTimeout, new RetryingTimeTracker().start()) {
@Override
protected SingleResponse rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@ -535,7 +531,7 @@ public class HTable implements Table {
List<Row> rows = new ArrayList<Row>();
rows.add(delete);
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
null, null, callable, operationTimeout);
null, null, callable, writeRpcTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@ -593,7 +589,7 @@ public class HTable implements Table {
public void mutateRow(final RowMutations rm) throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
rpcControllerFactory.newController()) {
rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){
@Override
protected MultiResponse rpcCall() throws Exception {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
@ -614,7 +610,7 @@ public class HTable implements Table {
}
};
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
null, null, callable, operationTimeout);
null, null, callable, writeRpcTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@ -798,7 +794,8 @@ public class HTable implements Table {
throws IOException {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
this.connection, getName(), row, this.rpcControllerFactory.newController()) {
this.connection, getName(), row, this.rpcControllerFactory.newController(),
writeRpcTimeout, new RetryingTimeTracker().start()) {
@Override
protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@ -814,7 +811,7 @@ public class HTable implements Table {
Object[] results = new Object[1];
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
null, results, callable, operationTimeout);
null, results, callable, -1);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@ -831,7 +828,7 @@ public class HTable implements Table {
throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory.newController()) {
rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) {
@Override
protected MultiResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@ -858,7 +855,7 @@ public class HTable implements Table {
* */
Object[] results = new Object[rm.getMutations().size()];
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
null, results, callable, operationTimeout);
null, results, callable, -1);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@ -1117,6 +1114,10 @@ public class HTable implements Table {
@Override
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
if (mutator != null) {
mutator.setOperationTimeout(operationTimeout);
}
multiAp.setOperationTimeout(operationTimeout);
}
@Override
@ -1133,8 +1134,8 @@ public class HTable implements Table {
@Override
@Deprecated
public void setRpcTimeout(int rpcTimeout) {
this.readRpcTimeout = rpcTimeout;
this.writeRpcTimeout = rpcTimeout;
setReadRpcTimeout(rpcTimeout);
setWriteRpcTimeout(rpcTimeout);
}
@Override
@ -1145,6 +1146,10 @@ public class HTable implements Table {
@Override
public void setWriteRpcTimeout(int writeRpcTimeout) {
this.writeRpcTimeout = writeRpcTimeout;
if (mutator != null) {
mutator.setRpcTimeout(writeRpcTimeout);
}
multiAp.setRpcTimeout(writeRpcTimeout);
}
@Override
@ -1229,7 +1234,8 @@ public class HTable implements Table {
AsyncProcess asyncProcess =
new AsyncProcess(connection, configuration, pool,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
true, RpcControllerFactory.instantiate(configuration), readRpcTimeout,
operationTimeout);
AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
new Callback<ClientProtos.CoprocessorServiceResult>() {
@ -1281,6 +1287,8 @@ public class HTable implements Table {
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
);
}
mutator.setRpcTimeout(writeRpcTimeout);
mutator.setOperationTimeout(operationTimeout);
return mutator;
}
}

View File

@ -442,6 +442,7 @@ public class HTableMultiplexer {
private final int maxRetryInQueue;
private final AtomicInteger retryInQueue = new AtomicInteger(0);
private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
private final int operationTimeout;
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
@ -454,7 +455,10 @@ public class HTableMultiplexer {
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout);
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory,
writeRpcTimeout, operationTimeout);
this.executor = executor;
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
}

View File

@ -50,12 +50,13 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiResponse> {
private final MultiAction<R> multiAction;
private final boolean cellBlock;
private MultiAction<R> multiAction;
private boolean cellBlock;
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, final MultiAction<R> multi, RpcController rpcController) {
super(connection, tableName, null, rpcController);
final ServerName location, final MultiAction<R> multi, RpcController rpcController,
int rpcTimeout, RetryingTimeTracker tracker) {
super(connection, tableName, null, rpcController, rpcTimeout, tracker);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
@ -64,6 +65,12 @@ class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiRespon
this.cellBlock = isCellBlock();
}
public void reset(ServerName location, MultiAction<R> multiAction) {
this.location = new HRegionLocation(null, location);
this.multiAction = multiAction;
this.cellBlock = isCellBlock();
}
@Override
protected HRegionLocation getLocation() {
throw new RuntimeException("Cannot get region location for multi-region request");

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
* @param <T> the class that the ServerCallable handles
*/
@InterfaceAudience.Private
public abstract class NoncedRegionServerCallable<T> extends CancellableRegionServerCallable<T> {
public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallable<T> {
private final long nonce;
/**

View File

@ -24,10 +24,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
class RetryingTimeTracker {
private long globalStartTime = -1;
public void start() {
public RetryingTimeTracker start() {
if (this.globalStartTime < 0) {
this.globalStartTime = EnvironmentEdgeManager.currentTime();
}
return this;
}
public int getRemainingTime(int callTimeout) {

View File

@ -58,7 +58,8 @@ public class RpcRetryingCallerWithReadReplicas {
protected final Get get;
protected final TableName tableName;
protected final int timeBeforeReplicas;
private final int callTimeout;
private final int operationTimeout;
private final int rpcTimeout;
private final int retries;
private final RpcControllerFactory rpcControllerFactory;
private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
@ -66,7 +67,7 @@ public class RpcRetryingCallerWithReadReplicas {
public RpcRetryingCallerWithReadReplicas(
RpcControllerFactory rpcControllerFactory, TableName tableName,
ClusterConnection cConnection, final Get get,
ExecutorService pool, int retries, int callTimeout,
ExecutorService pool, int retries, int operationTimeout, int rpcTimeout,
int timeBeforeReplicas) {
this.rpcControllerFactory = rpcControllerFactory;
this.tableName = tableName;
@ -75,7 +76,8 @@ public class RpcRetryingCallerWithReadReplicas {
this.get = get;
this.pool = pool;
this.retries = retries;
this.callTimeout = callTimeout;
this.operationTimeout = operationTimeout;
this.rpcTimeout = rpcTimeout;
this.timeBeforeReplicas = timeBeforeReplicas;
this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
}
@ -91,7 +93,7 @@ public class RpcRetryingCallerWithReadReplicas {
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
rpcControllerFactory.newController());
rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker());
this.id = id;
this.location = location;
}
@ -133,7 +135,7 @@ public class RpcRetryingCallerWithReadReplicas {
ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get);
HBaseRpcController hrc = (HBaseRpcController)getRpcController();
hrc.reset();
hrc.setCallTimeout(callTimeout);
hrc.setCallTimeout(rpcTimeout);
hrc.setPriority(tableName);
ClientProtos.GetResponse response = getStub().get(hrc, request);
if (response == null) {
@ -258,7 +260,7 @@ public class RpcRetryingCallerWithReadReplicas {
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
cs.submit(callOnReplica, callTimeout, id);
cs.submit(callOnReplica, operationTimeout, id);
}
}

View File

@ -152,7 +152,10 @@ public class TestAsyncProcess {
final AtomicInteger nbActions = new AtomicInteger();
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
public AtomicInteger callsCt = new AtomicInteger();
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
private long previousTimeout = -1;
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
@ -162,7 +165,7 @@ public class TestAsyncProcess {
// Test HTable has tableName of null, so pass DUMMY_TABLE
AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<Res>(
DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults,
results, callback, callable, curTimeout, this);
results, callback, callable, operationTimeout, rpcTimeout, this);
allReqs.add(r);
return r;
}
@ -174,14 +177,16 @@ public class TestAsyncProcess {
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout);
new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout,
operationTimeout);
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
rpcTimeout, operationTimeout);
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
@ -193,7 +198,8 @@ public class TestAsyncProcess {
throw new RejectedExecutionException("test under failure");
}
},
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
rpcTimeout, operationTimeout);
}
@Override
@ -213,7 +219,7 @@ public class TestAsyncProcess {
}
@Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable) {
CancellableRegionServerCallable callable, int rpcTimeout) {
callsCt.incrementAndGet();
MultiServerCallable callable1 = (MultiServerCallable) callable;
final MultiResponse mr = createMultiResponse(
@ -255,11 +261,10 @@ public class TestAsyncProcess {
public MyAsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
Batch.Callback callback,
CancellableRegionServerCallable callable, int timeout,
AsyncProcess asyncProcess) {
Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout,
int rpcTimeout, AsyncProcess asyncProcess) {
super(tableName, actions, nonceGroup, pool, needResults,
results, callback, callable, timeout, asyncProcess);
results, callback, callable, operationTimeout, rpcTimeout, asyncProcess);
}
@Override
@ -299,7 +304,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable) {
CancellableRegionServerCallable callable, int rpcTimeout) {
callsCt.incrementAndGet();
return new CallerWithFailure(ioe);
}
@ -351,7 +356,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable payloadCallable) {
CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
@ -1638,12 +1643,14 @@ public class TestAsyncProcess {
}
static class AsyncProcessForThrowableCheck extends AsyncProcess {
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
ExecutorService pool) {
super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
conf), rpcTimeout);
conf), rpcTimeout, operationTimeout);
}
}

View File

@ -128,7 +128,9 @@ public class HConnectionTestingUtility {
Mockito.when(c.getAsyncProcess()).thenReturn(
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)));
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
RpcRetryingCallerFactory.instantiate(conf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));

View File

@ -141,6 +141,36 @@ public class TestHCM {
throw new IOException("first call I fail");
}
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(sleepTime.get());
if (ct.incrementAndGet() == 1) {
throw new IOException("first call I fail");
}
}
@Override
public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
final Delete delete,
final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(sleepTime.get());
if (ct.incrementAndGet() == 1) {
throw new IOException("first call I fail");
}
}
@Override
public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment) throws IOException {
Threads.sleep(sleepTime.get());
if (ct.incrementAndGet() == 1) {
throw new IOException("first call I fail");
}
return super.preIncrement(e, increment);
}
}
public static class SleepCoprocessor extends BaseRegionObserver {
@ -156,16 +186,20 @@ public class TestHCM {
final Put put, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(SLEEP_TIME);
}
}
public static class SleepWriteCoprocessor extends BaseRegionObserver {
public static final int SLEEP_TIME = 5000;
@Override
public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment) throws IOException {
Threads.sleep(SLEEP_TIME);
return super.preIncrement(e, increment);
}
@Override
public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete,
final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(SLEEP_TIME);
}
}
public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver {
@ -364,11 +398,12 @@ public class TestHCM {
* timeouted when the server answers.
*/
@Test
public void testOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
public void testGetOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout");
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM});
Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
table.setRpcTimeout(Integer.MAX_VALUE);
SleepAndFailFirstTime.ct.set(0);
// Check that it works if the timeout is big enough
table.setOperationTimeout(120 * 1000);
table.get(new Get(FAM_NAM));
@ -391,6 +426,64 @@ public class TestHCM {
}
}
@Test
public void testPutOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout");
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration());
table.setRpcTimeout(Integer.MAX_VALUE);
SleepAndFailFirstTime.ct.set(0);
// Check that it works if the timeout is big enough
table.setOperationTimeout(120 * 1000);
table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
// Resetting and retrying. Will fail this time, not enough time for the second try
SleepAndFailFirstTime.ct.set(0);
try {
table.setOperationTimeout(30 * 1000);
table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
Assert.fail("We expect an exception here");
} catch (RetriesExhaustedWithDetailsException e) {
// The client has a CallTimeout class, but it's not shared.We're not very clean today,
// in the general case you can expect the call to stop, but the exception may vary.
// In this test however, we're sure that it will be a socket timeout.
LOG.info("We received an exception, as expected ", e);
} catch (IOException e) {
Assert.fail("Wrong exception:" + e.getMessage());
} finally {
table.close();
}
}
@Test
public void testDeleteOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout");
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration());
table.setRpcTimeout(Integer.MAX_VALUE);
SleepAndFailFirstTime.ct.set(0);
// Check that it works if the timeout is big enough
table.setOperationTimeout(120 * 1000);
table.delete(new Delete(FAM_NAM));
// Resetting and retrying. Will fail this time, not enough time for the second try
SleepAndFailFirstTime.ct.set(0);
try {
table.setOperationTimeout(30 * 1000);
table.delete(new Delete(FAM_NAM));
Assert.fail("We expect an exception here");
} catch (RetriesExhaustedWithDetailsException e) {
// The client has a CallTimeout class, but it's not shared.We're not very clean today,
// in the general case you can expect the call to stop, but the exception may vary.
// In this test however, we're sure that it will be a socket timeout.
LOG.info("We received an exception, as expected ", e);
} catch (IOException e) {
Assert.fail("Wrong exception:" + e.getMessage());
} finally {
table.close();
}
}
@Test
public void testRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
@ -419,14 +512,14 @@ public class TestHCM {
}
@Test
public void testWriteRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout");
hdt.addCoprocessor(SleepWriteCoprocessor.class.getName());
public void testIncrementRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testIncrementRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100);
t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
Increment i = new Increment(FAM_NAM);
i.addColumn(FAM_NAM, FAM_NAM, 1);
t.increment(i);
@ -436,7 +529,7 @@ public class TestHCM {
}
// Again, with configuration based override
c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2);
c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
try (Connection conn = ConnectionFactory.createConnection(c)) {
try (Table t = conn.getTable(hdt.getTableName())) {
Increment i = new Increment(FAM_NAM);
@ -450,8 +543,46 @@ public class TestHCM {
}
@Test
public void testReadRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout");
public void testDeleteRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
Delete d = new Delete(FAM_NAM);
d.addColumn(FAM_NAM, FAM_NAM, 1);
t.delete(d);
fail("Write should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
}
@Test
public void testPutRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
Put p = new Put(FAM_NAM);
p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
t.put(p);
fail("Write should not have succeeded");
} catch (RetriesExhaustedException e) {
// expected
}
}
@Test
public void testGetRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
@ -502,6 +633,7 @@ public class TestHCM {
TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
SleepAndFailFirstTime.ct.set(0);
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
@ -1013,8 +1145,7 @@ public class TestHCM {
curServer.getServerName().getPort(),
conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
table.close();
connection.close();
}