HBASE-15866 Split hbase.rpc.timeout into *.read.timeout and *.write.timeout
Signed-off-by: Andrew Purtell <apurtell@apache.org> Amending-Author: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
4e08a8bec9
commit
30d7eeaefe
|
@ -281,7 +281,7 @@ class AsyncProcess {
|
||||||
|
|
||||||
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
|
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
|
||||||
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
|
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
|
||||||
RpcControllerFactory rpcFactory) {
|
RpcControllerFactory rpcFactory, int rpcTimeout) {
|
||||||
if (hc == null) {
|
if (hc == null) {
|
||||||
throw new IllegalArgumentException("ClusterConnection cannot be null.");
|
throw new IllegalArgumentException("ClusterConnection cannot be null.");
|
||||||
}
|
}
|
||||||
|
@ -297,8 +297,7 @@ class AsyncProcess {
|
||||||
// how many times we could try in total, one more than retry number
|
// how many times we could try in total, one more than retry number
|
||||||
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
|
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
|
||||||
this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
this.timeout = rpcTimeout;
|
||||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
|
||||||
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
|
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
|
||||||
|
|
||||||
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
||||||
|
|
|
@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
@ -72,6 +73,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
||||||
private final int maxKeyValueSize;
|
private final int maxKeyValueSize;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
private final ExecutorService pool;
|
private final ExecutorService pool;
|
||||||
|
private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected AsyncProcess ap; // non-final so can be overridden in test
|
protected AsyncProcess ap; // non-final so can be overridden in test
|
||||||
|
@ -94,8 +96,12 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
||||||
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
|
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
|
||||||
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
|
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
|
||||||
|
|
||||||
|
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||||
|
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||||
|
|
||||||
// puts need to track errors globally due to how the APIs currently work.
|
// puts need to track errors globally due to how the APIs currently work.
|
||||||
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
|
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1823,7 +1823,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
// For tests to override.
|
// For tests to override.
|
||||||
protected AsyncProcess createAsyncProcess(Configuration conf) {
|
protected AsyncProcess createAsyncProcess(Configuration conf) {
|
||||||
// No default pool available.
|
// No default pool available.
|
||||||
return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory);
|
int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
|
return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -112,7 +112,8 @@ public class HTable implements Table {
|
||||||
protected long scannerMaxResultSize;
|
protected long scannerMaxResultSize;
|
||||||
private ExecutorService pool; // For Multi & Scan
|
private ExecutorService pool; // For Multi & Scan
|
||||||
private int operationTimeout; // global timeout for each blocking method with retrying rpc
|
private int operationTimeout; // global timeout for each blocking method with retrying rpc
|
||||||
private int rpcTimeout; // timeout for each rpc request
|
private int readRpcTimeout; // timeout for each read rpc request
|
||||||
|
private int writeRpcTimeout; // timeout for each write rpc request
|
||||||
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
|
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
|
||||||
private final boolean cleanupConnectionOnClose; // close the connection in close()
|
private final boolean cleanupConnectionOnClose; // close the connection in close()
|
||||||
private Consistency defaultConsistency = Consistency.STRONG;
|
private Consistency defaultConsistency = Consistency.STRONG;
|
||||||
|
@ -212,8 +213,12 @@ public class HTable implements Table {
|
||||||
|
|
||||||
this.operationTimeout = tableName.isSystemTable() ?
|
this.operationTimeout = tableName.isSystemTable() ?
|
||||||
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
|
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
|
||||||
this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
|
||||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||||
|
this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||||
|
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||||
this.scannerCaching = connConfiguration.getScannerCaching();
|
this.scannerCaching = connConfiguration.getScannerCaching();
|
||||||
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
||||||
if (this.rpcCallerFactory == null) {
|
if (this.rpcCallerFactory == null) {
|
||||||
|
@ -257,7 +262,7 @@ public class HTable implements Table {
|
||||||
@Override
|
@Override
|
||||||
public HTableDescriptor getTableDescriptor() throws IOException {
|
public HTableDescriptor getTableDescriptor() throws IOException {
|
||||||
HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
|
HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
|
||||||
rpcControllerFactory, operationTimeout, rpcTimeout);
|
rpcControllerFactory, operationTimeout, readRpcTimeout);
|
||||||
if (htd != null) {
|
if (htd != null) {
|
||||||
return new UnmodifyableHTableDescriptor(htd);
|
return new UnmodifyableHTableDescriptor(htd);
|
||||||
}
|
}
|
||||||
|
@ -430,7 +435,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
|
return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
|
||||||
this.operationTimeout);
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -528,7 +533,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
|
||||||
this.operationTimeout);
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -654,7 +659,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
|
return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
|
||||||
this.operationTimeout);
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -686,7 +691,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
|
return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
|
||||||
this.operationTimeout);
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -742,7 +747,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
|
return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable,
|
||||||
this.operationTimeout);
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -772,7 +777,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
|
||||||
this.operationTimeout);
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -803,7 +808,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
|
||||||
this.operationTimeout);
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -833,7 +838,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
|
||||||
this.operationTimeout);
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -864,7 +869,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
|
return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
|
||||||
this.operationTimeout);
|
this.operationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1196,13 +1201,34 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Deprecated
|
||||||
public int getRpcTimeout() {
|
public int getRpcTimeout() {
|
||||||
return rpcTimeout;
|
return readRpcTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Deprecated
|
||||||
public void setRpcTimeout(int rpcTimeout) {
|
public void setRpcTimeout(int rpcTimeout) {
|
||||||
this.rpcTimeout = rpcTimeout;
|
this.readRpcTimeout = rpcTimeout;
|
||||||
|
this.writeRpcTimeout = rpcTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteRpcTimeout() {
|
||||||
|
return writeRpcTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setWriteRpcTimeout(int writeRpcTimeout) {
|
||||||
|
this.writeRpcTimeout = writeRpcTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadRpcTimeout() { return readRpcTimeout; }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setReadRpcTimeout(int readRpcTimeout) {
|
||||||
|
this.readRpcTimeout = readRpcTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1282,7 +1308,7 @@ public class HTable implements Table {
|
||||||
AsyncProcess asyncProcess =
|
AsyncProcess asyncProcess =
|
||||||
new AsyncProcess(connection, configuration, pool,
|
new AsyncProcess(connection, configuration, pool,
|
||||||
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
|
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
|
||||||
true, RpcControllerFactory.instantiate(configuration));
|
true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
|
||||||
|
|
||||||
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
|
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
|
||||||
new Callback<ClientProtos.CoprocessorServiceResult>() {
|
new Callback<ClientProtos.CoprocessorServiceResult>() {
|
||||||
|
|
|
@ -442,6 +442,7 @@ public class HTableMultiplexer {
|
||||||
private final ScheduledExecutorService executor;
|
private final ScheduledExecutorService executor;
|
||||||
private final int maxRetryInQueue;
|
private final int maxRetryInQueue;
|
||||||
private final AtomicInteger retryInQueue = new AtomicInteger(0);
|
private final AtomicInteger retryInQueue = new AtomicInteger(0);
|
||||||
|
private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
|
||||||
|
|
||||||
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
|
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
|
||||||
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
|
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
|
||||||
|
@ -451,7 +452,10 @@ public class HTableMultiplexer {
|
||||||
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
|
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
|
||||||
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
|
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
|
||||||
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||||
this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
|
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||||
|
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||||
|
this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout);
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
|
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
|
||||||
}
|
}
|
||||||
|
|
|
@ -584,18 +584,57 @@ public interface Table extends Closeable {
|
||||||
*/
|
*/
|
||||||
int getOperationTimeout();
|
int getOperationTimeout();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get timeout (millisecond) of each rpc request in this Table instance.
|
||||||
|
*
|
||||||
|
* @returns Currently configured read timeout
|
||||||
|
* @deprecated Use getReadRpcTimeout or getWriteRpcTimeout instead
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
int getRpcTimeout();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set timeout (millisecond) of each rpc request in operations of this Table instance, will
|
* Set timeout (millisecond) of each rpc request in operations of this Table instance, will
|
||||||
* override the value of hbase.rpc.timeout in configuration.
|
* override the value of hbase.rpc.timeout in configuration.
|
||||||
* If a rpc request waiting too long, it will stop waiting and send a new request to retry until
|
* If a rpc request waiting too long, it will stop waiting and send a new request to retry until
|
||||||
* retries exhausted or operation timeout reached.
|
* retries exhausted or operation timeout reached.
|
||||||
|
* <p>
|
||||||
|
* NOTE: This will set both the read and write timeout settings to the provided value.
|
||||||
|
*
|
||||||
* @param rpcTimeout the timeout of each rpc request in millisecond.
|
* @param rpcTimeout the timeout of each rpc request in millisecond.
|
||||||
|
*
|
||||||
|
* @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
void setRpcTimeout(int rpcTimeout);
|
void setRpcTimeout(int rpcTimeout);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get timeout (millisecond) of each rpc request in this Table instance.
|
* Get timeout (millisecond) of each rpc read request in this Table instance.
|
||||||
*/
|
*/
|
||||||
int getRpcTimeout();
|
int getReadRpcTimeout();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set timeout (millisecond) of each rpc read request in operations of this Table instance, will
|
||||||
|
* override the value of hbase.rpc.read.timeout in configuration.
|
||||||
|
* If a rpc read request waiting too long, it will stop waiting and send a new request to retry
|
||||||
|
* until retries exhausted or operation timeout reached.
|
||||||
|
*
|
||||||
|
* @param readRpcTimeout
|
||||||
|
*/
|
||||||
|
void setReadRpcTimeout(int readRpcTimeout);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get timeout (millisecond) of each rpc write request in this Table instance.
|
||||||
|
*/
|
||||||
|
int getWriteRpcTimeout();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set timeout (millisecond) of each rpc write request in operations of this Table instance, will
|
||||||
|
* override the value of hbase.rpc.write.timeout in configuration.
|
||||||
|
* If a rpc write request waiting too long, it will stop waiting and send a new request to retry
|
||||||
|
* until retries exhausted or operation timeout reached.
|
||||||
|
*
|
||||||
|
* @param writeRpcTimeout
|
||||||
|
*/
|
||||||
|
void setWriteRpcTimeout(int writeRpcTimeout);
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,6 +138,7 @@ public class TestAsyncProcess {
|
||||||
final AtomicInteger nbActions = new AtomicInteger();
|
final AtomicInteger nbActions = new AtomicInteger();
|
||||||
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
|
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
|
||||||
public AtomicInteger callsCt = new AtomicInteger();
|
public AtomicInteger callsCt = new AtomicInteger();
|
||||||
|
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
|
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
|
||||||
|
@ -157,14 +158,14 @@ public class TestAsyncProcess {
|
||||||
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
|
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
|
||||||
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
|
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
|
||||||
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
|
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
|
||||||
new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
|
new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MyAsyncProcess(
|
public MyAsyncProcess(
|
||||||
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
|
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
|
||||||
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
|
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
|
||||||
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
|
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
|
||||||
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
|
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
|
public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
|
||||||
|
@ -176,7 +177,7 @@ public class TestAsyncProcess {
|
||||||
throw new RejectedExecutionException("test under failure");
|
throw new RejectedExecutionException("test under failure");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
|
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1111,10 +1112,12 @@ public class TestAsyncProcess {
|
||||||
}
|
}
|
||||||
|
|
||||||
static class AsyncProcessForThrowableCheck extends AsyncProcess {
|
static class AsyncProcessForThrowableCheck extends AsyncProcess {
|
||||||
|
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
|
|
||||||
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
|
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
|
||||||
ExecutorService pool) {
|
ExecutorService pool) {
|
||||||
super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
|
super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
|
||||||
conf));
|
conf), rpcTimeout);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -815,9 +815,22 @@ public final class HConstants {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* timeout for each RPC
|
* timeout for each RPC
|
||||||
|
* @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY}
|
||||||
|
* instead.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
|
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* timeout for each read RPC
|
||||||
|
*/
|
||||||
|
public static final String HBASE_RPC_READ_TIMEOUT_KEY = "hbase.rpc.read.timeout";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* timeout for each write RPC
|
||||||
|
*/
|
||||||
|
public static final String HBASE_RPC_WRITE_TIMEOUT_KEY = "hbase.rpc.write.timeout";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default value of {@link #HBASE_RPC_TIMEOUT_KEY}
|
* Default value of {@link #HBASE_RPC_TIMEOUT_KEY}
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -860,12 +860,34 @@ public class RemoteHTable implements Table {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Deprecated
|
||||||
public void setRpcTimeout(int rpcTimeout) {
|
public void setRpcTimeout(int rpcTimeout) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Deprecated
|
||||||
public int getRpcTimeout() {
|
public int getRpcTimeout() {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadRpcTimeout() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setReadRpcTimeout(int readRpcTimeout) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteRpcTimeout() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setWriteRpcTimeout(int writeRpcTimeout) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -308,12 +308,26 @@ public final class HTableWrapper implements Table {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Deprecated
|
||||||
public void setRpcTimeout(int rpcTimeout) {
|
public void setRpcTimeout(int rpcTimeout) {
|
||||||
table.setRpcTimeout(rpcTimeout);
|
table.setRpcTimeout(rpcTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
public void setWriteRpcTimeout(int writeRpcTimeout) { table.setWriteRpcTimeout(writeRpcTimeout); }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setReadRpcTimeout(int readRpcTimeout) { table.setReadRpcTimeout(readRpcTimeout); }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Deprecated
|
||||||
public int getRpcTimeout() {
|
public int getRpcTimeout() {
|
||||||
return table.getRpcTimeout();
|
return table.getRpcTimeout();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteRpcTimeout() { return table.getWriteRpcTimeout(); }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadRpcTimeout() { return table.getReadRpcTimeout(); }
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.RegionLocations;
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
@ -126,7 +127,8 @@ public class HConnectionTestingUtility {
|
||||||
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
|
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
|
||||||
Mockito.when(c.getAsyncProcess()).thenReturn(
|
Mockito.when(c.getAsyncProcess()).thenReturn(
|
||||||
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
|
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
|
||||||
RpcControllerFactory.instantiate(conf)));
|
RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
|
||||||
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
|
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
|
||||||
RpcRetryingCallerFactory.instantiate(conf,
|
RpcRetryingCallerFactory.instantiate(conf,
|
||||||
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
|
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
|
||||||
|
@ -194,4 +196,4 @@ public class HConnectionTestingUtility {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
|
@ -149,6 +145,16 @@ public class TestHCM {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class SleepWriteCoprocessor extends BaseRegionObserver {
|
||||||
|
public static final int SLEEP_TIME = 5000;
|
||||||
|
@Override
|
||||||
|
public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
final Increment increment) throws IOException {
|
||||||
|
Threads.sleep(SLEEP_TIME);
|
||||||
|
return super.preIncrement(e, increment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
|
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
|
||||||
|
@ -351,7 +357,7 @@ public class TestHCM {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = RetriesExhaustedException.class)
|
@Test
|
||||||
public void testRpcTimeout() throws Exception {
|
public void testRpcTimeout() throws Exception {
|
||||||
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
|
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
|
||||||
hdt.addCoprocessor(SleepCoprocessor.class.getName());
|
hdt.addCoprocessor(SleepCoprocessor.class.getName());
|
||||||
|
@ -361,6 +367,78 @@ public class TestHCM {
|
||||||
t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
|
t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
|
||||||
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
|
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
|
||||||
t.get(new Get(FAM_NAM));
|
t.get(new Get(FAM_NAM));
|
||||||
|
fail("Get should not have succeeded");
|
||||||
|
} catch (RetriesExhaustedException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// Again, with configuration based override
|
||||||
|
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
|
||||||
|
try (Connection conn = ConnectionFactory.createConnection(c)) {
|
||||||
|
try (Table t = conn.getTable(hdt.getTableName())) {
|
||||||
|
t.get(new Get(FAM_NAM));
|
||||||
|
fail("Get should not have succeeded");
|
||||||
|
} catch (RetriesExhaustedException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteRpcTimeout() throws Exception {
|
||||||
|
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout");
|
||||||
|
hdt.addCoprocessor(SleepWriteCoprocessor.class.getName());
|
||||||
|
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
|
||||||
|
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
|
||||||
|
t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2);
|
||||||
|
t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100);
|
||||||
|
Increment i = new Increment(FAM_NAM);
|
||||||
|
i.addColumn(FAM_NAM, FAM_NAM, 1);
|
||||||
|
t.increment(i);
|
||||||
|
fail("Write should not have succeeded");
|
||||||
|
} catch (RetriesExhaustedException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// Again, with configuration based override
|
||||||
|
c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2);
|
||||||
|
try (Connection conn = ConnectionFactory.createConnection(c)) {
|
||||||
|
try (Table t = conn.getTable(hdt.getTableName())) {
|
||||||
|
Increment i = new Increment(FAM_NAM);
|
||||||
|
i.addColumn(FAM_NAM, FAM_NAM, 1);
|
||||||
|
t.increment(i);
|
||||||
|
fail("Write should not have succeeded");
|
||||||
|
} catch (RetriesExhaustedException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadRpcTimeout() throws Exception {
|
||||||
|
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout");
|
||||||
|
hdt.addCoprocessor(SleepCoprocessor.class.getName());
|
||||||
|
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
|
||||||
|
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
|
||||||
|
t.setReadRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
|
||||||
|
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
|
||||||
|
t.get(new Get(FAM_NAM));
|
||||||
|
fail("Get should not have succeeded");
|
||||||
|
} catch (RetriesExhaustedException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// Again, with configuration based override
|
||||||
|
c.setInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
|
||||||
|
try (Connection conn = ConnectionFactory.createConnection(c)) {
|
||||||
|
try (Table t = conn.getTable(hdt.getTableName())) {
|
||||||
|
t.get(new Get(FAM_NAM));
|
||||||
|
fail("Get should not have succeeded");
|
||||||
|
} catch (RetriesExhaustedException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -333,12 +333,26 @@ public class RegionAsTable implements Table {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Deprecated
|
||||||
public void setRpcTimeout(int rpcTimeout) {
|
public void setRpcTimeout(int rpcTimeout) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
public void setWriteRpcTimeout(int writeRpcTimeout) {throw new UnsupportedOperationException(); }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setReadRpcTimeout(int readRpcTimeout) {throw new UnsupportedOperationException(); }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Deprecated
|
||||||
public int getRpcTimeout() {
|
public int getRpcTimeout() {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteRpcTimeout() { throw new UnsupportedOperationException(); }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadRpcTimeout() { throw new UnsupportedOperationException(); }
|
||||||
}
|
}
|
Loading…
Reference in New Issue