HubSpot Backport: HBASE-27078 Allow configuring a separate timeout for meta scans (#4585)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Bryan Beaudreault 2022-07-07 16:33:16 -04:00
parent 9d85d622a1
commit 7023542cce
16 changed files with 634 additions and 67 deletions

View File

@ -23,6 +23,9 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PE
import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -56,6 +59,9 @@ class AsyncConnectionConfiguration {
// timeout for each read rpc request // timeout for each read rpc request
private final long readRpcTimeoutNs; private final long readRpcTimeoutNs;
// timeout for each read rpc request against system tables
private final long metaReadRpcTimeoutNs;
// timeout for each write rpc request // timeout for each write rpc request
private final long writeRpcTimeoutNs; private final long writeRpcTimeoutNs;
@ -73,6 +79,7 @@ class AsyncConnectionConfiguration {
// client that it is still alive. The scan timeout is used as operation timeout for every // client that it is still alive. The scan timeout is used as operation timeout for every
// operations in a scan, such as openScanner or next. // operations in a scan, such as openScanner or next.
private final long scanTimeoutNs; private final long scanTimeoutNs;
private final long metaScanTimeoutNs;
private final int scannerCaching; private final int scannerCaching;
@ -110,12 +117,13 @@ class AsyncConnectionConfiguration {
connectionConf.getMetaOperationTimeout()); connectionConf.getMetaOperationTimeout());
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getOperationTimeout()); this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getOperationTimeout());
this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getRpcTimeout()); this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getRpcTimeout());
this.readRpcTimeoutNs = long readRpcTimeoutMillis =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, connectionConf.getRpcTimeout());
connectionConf.getReadRpcTimeout())); this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis);
this.writeRpcTimeoutNs = this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis));
connectionConf.getWriteRpcTimeout())); this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, connectionConf.getWriteRpcTimeout()));
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getPauseMillis()); this.pauseNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getPauseMillis());
this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos( this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(
connectionConf.getPauseMillisForServerOverloaded()); connectionConf.getPauseMillisForServerOverloaded());
@ -125,9 +133,11 @@ class AsyncConnectionConfiguration {
connectionConf.getReplicaCallTimeoutMicroSecondScan()); connectionConf.getReplicaCallTimeoutMicroSecondScan());
this.primaryMetaScanTimeoutNs = this.primaryMetaScanTimeoutNs =
TimeUnit.MICROSECONDS.toNanos(connectionConf.getMetaReplicaCallTimeoutMicroSecondScan()); TimeUnit.MICROSECONDS.toNanos(connectionConf.getMetaReplicaCallTimeoutMicroSecondScan());
this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos( long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
conf.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis);
this.metaScanTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis));
// fields not in connection configuration // fields not in connection configuration
this.startLogErrorsCnt = this.startLogErrorsCnt =
@ -152,6 +162,10 @@ class AsyncConnectionConfiguration {
return readRpcTimeoutNs; return readRpcTimeoutNs;
} }
long getMetaReadRpcTimeoutNs() {
return metaReadRpcTimeoutNs;
}
long getWriteRpcTimeoutNs() { long getWriteRpcTimeoutNs() {
return writeRpcTimeoutNs; return writeRpcTimeoutNs;
} }
@ -176,6 +190,10 @@ class AsyncConnectionConfiguration {
return scanTimeoutNs; return scanTimeoutNs;
} }
long getMetaScanTimeoutNs() {
return metaScanTimeoutNs;
}
int getScannerCaching() { int getScannerCaching() {
return scannerCaching; return scannerCaching;
} }

View File

@ -55,9 +55,12 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
this.tableName = tableName; this.tableName = tableName;
this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs() this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs()
: connConf.getOperationTimeoutNs(); : connConf.getOperationTimeoutNs();
this.scanTimeoutNs = connConf.getScanTimeoutNs(); this.scanTimeoutNs =
tableName.isSystemTable() ? connConf.getMetaScanTimeoutNs() : connConf.getScanTimeoutNs();
this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); this.readRpcTimeoutNs = tableName.isSystemTable()
? connConf.getMetaReadRpcTimeoutNs()
: connConf.getReadRpcTimeoutNs();
this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
this.pauseNs = connConf.getPauseNs(); this.pauseNs = connConf.getPauseNs();
this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded(); this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();

View File

@ -60,11 +60,11 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
private final Condition notFull = lock.newCondition(); private final Condition notFull = lock.newCondition();
public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int replicaCallTimeoutMicroSecondScan) throws IOException { int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan); scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan);
exceptionsQueue = new ConcurrentLinkedQueue<>(); exceptionsQueue = new ConcurrentLinkedQueue<>();
Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + ".asyncPrefetcher"); Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + ".asyncPrefetcher");
} }

View File

@ -70,6 +70,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected final long maxScannerResultSize; protected final long maxScannerResultSize;
private final ClusterConnection connection; private final ClusterConnection connection;
protected final TableName tableName; protected final TableName tableName;
protected final int readRpcTimeout;
protected final int scannerTimeout; protected final int scannerTimeout;
protected boolean scanMetricsPublished = false; protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result[]> caller; protected RpcRetryingCaller<Result[]> caller;
@ -94,9 +95,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
* @throws IOException * @throws IOException
*/ */
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
throws IOException { int scannerTimeout, int primaryOperationTimeout) throws IOException {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace( LOG.trace(
"Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@ -115,8 +116,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
} }
this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, this.readRpcTimeout = scanReadRpcTimeout;
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); this.scannerTimeout = scannerTimeout;
// check if application wants to collect scan metrics // check if application wants to collect scan metrics
initScanMetrics(scan); initScanMetrics(scan);
@ -243,9 +244,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
// clear the current region, we will set a new value to it after the first call of the new // clear the current region, we will set a new value to it after the first call of the new
// callable. // callable.
this.currentRegion = null; this.currentRegion = null;
this.callable = this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(),
new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout,
primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); scannerTimeout, caching, conf, caller);
this.callable.setCaching(this.caching); this.callable.setCaching(this.caching);
incRegionCountMetrics(scanMetrics); incRegionCountMetrics(scanMetrics);
return true; return true;

View File

@ -37,11 +37,11 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ClientSimpleScanner extends ClientScanner { public class ClientSimpleScanner extends ClientScanner {
public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int replicaCallTimeoutMicroSecondScan) throws IOException { int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan); scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan);
} }
@Override @Override

View File

@ -66,6 +66,11 @@ public class ConnectionConfiguration {
HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED);
} }
public static final String HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY =
"hbase.client.meta.read.rpc.timeout";
public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
"hbase.client.meta.scanner.timeout.period";
private final long writeBufferSize; private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimeoutMs;
private final long writeBufferPeriodicFlushTimerTickMs; private final long writeBufferPeriodicFlushTimerTickMs;
@ -80,7 +85,11 @@ public class ConnectionConfiguration {
private final int maxKeyValueSize; private final int maxKeyValueSize;
private final int rpcTimeout; private final int rpcTimeout;
private final int readRpcTimeout; private final int readRpcTimeout;
private final int metaReadRpcTimeout;
private final int writeRpcTimeout; private final int writeRpcTimeout;
private final int scanTimeout;
private final int metaScanTimeout;
// toggle for async/sync prefetch // toggle for async/sync prefetch
private final boolean clientScannerAsyncPrefetch; private final boolean clientScannerAsyncPrefetch;
private final long pauseMs; private final long pauseMs;
@ -138,10 +147,17 @@ public class ConnectionConfiguration {
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.metaReadRpcTimeout = conf.getInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeout);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.scanTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout);
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
@ -178,8 +194,11 @@ public class ConnectionConfiguration {
this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.scanTimeout = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
this.metaScanTimeout = scanTimeout;
this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE; this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE; this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
} }
@ -188,6 +207,10 @@ public class ConnectionConfiguration {
return readRpcTimeout; return readRpcTimeout;
} }
public int getMetaReadRpcTimeout() {
return metaReadRpcTimeout;
}
public int getWriteRpcTimeout() { public int getWriteRpcTimeout() {
return writeRpcTimeout; return writeRpcTimeout;
} }
@ -248,6 +271,14 @@ public class ConnectionConfiguration {
return rpcTimeout; return rpcTimeout;
} }
public int getScanTimeout() {
return scanTimeout;
}
public int getMetaScanTimeout() {
return metaScanTimeout;
}
public long getPauseMillis() { public long getPauseMillis() {
return pauseMs; return pauseMs;
} }

View File

@ -934,7 +934,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
s.resetMvccReadPoint(); s.resetMvccReadPoint();
try (ReversedClientScanner rcs = try (ReversedClientScanner rcs =
new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) { rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(),
connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond)) {
boolean tableNotFound = true; boolean tableNotFound = true;
for (;;) { for (;;) {
Result regionInfoRow = rcs.next(); Result regionInfoRow = rcs.next();

View File

@ -114,6 +114,9 @@ public class HTable implements Table {
private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX
private int readRpcTimeoutMs; // timeout for each read rpc request private int readRpcTimeoutMs; // timeout for each read rpc request
private int writeRpcTimeoutMs; // timeout for each write rpc request private int writeRpcTimeoutMs; // timeout for each write rpc request
private final int scanReadRpcTimeout;
private final int scanTimeout;
private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final HRegionLocator locator; private final HRegionLocator locator;
@ -187,6 +190,8 @@ public class HTable implements Table {
this.rpcTimeoutMs = builder.rpcTimeout; this.rpcTimeoutMs = builder.rpcTimeout;
this.readRpcTimeoutMs = builder.readRpcTimeout; this.readRpcTimeoutMs = builder.readRpcTimeout;
this.writeRpcTimeoutMs = builder.writeRpcTimeout; this.writeRpcTimeoutMs = builder.writeRpcTimeout;
this.scanReadRpcTimeout = builder.scanReadRpcTimeout;
this.scanTimeout = builder.scanTimeout;
this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
@ -306,24 +311,24 @@ public class HTable implements Table {
// it is not supposed to be set by user, clear // it is not supposed to be set by user, clear
scan.resetMvccReadPoint(); scan.resetMvccReadPoint();
} }
Boolean async = scan.isAsyncPrefetch(); final boolean async = scan.isAsyncPrefetch() != null
if (async == null) { ? scan.isAsyncPrefetch()
async = connConfiguration.isClientScannerAsyncPrefetch(); : connConfiguration.isClientScannerAsyncPrefetch();
} final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan();
if (scan.isReversed()) { if (scan.isReversed()) {
return new ReversedClientScanner(getConfiguration(), scan, getName(), return new ReversedClientScanner(getConfiguration(), scan, getName(), connection,
this.connection, this.rpcCallerFactory, this.rpcControllerFactory, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); replicaTimeout);
} else { } else {
if (async) { if (async) {
return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection,
this.rpcCallerFactory, this.rpcControllerFactory, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); replicaTimeout);
} else { } else {
return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection,
this.rpcCallerFactory, this.rpcControllerFactory, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); replicaTimeout);
} }
} }
} }

View File

@ -1,4 +1,4 @@
/** /**
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -59,16 +59,17 @@ public class ResultBoundedCompletionService<V> {
private T result = null; private T result = null;
private ExecutionException exeEx = null; private ExecutionException exeEx = null;
private volatile boolean cancelled = false; private volatile boolean cancelled = false;
private final int callTimeout; private final int operationTimeout;
private final RpcRetryingCaller<T> retryingCaller; private final RpcRetryingCaller<T> retryingCaller;
private boolean resultObtained = false; private boolean resultObtained = false;
private final int replicaId; // replica id private final int replicaId; // replica id
public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) { public QueueingFuture(RetryingCallable<T> future, int rpcTimeout, int operationTimeout,
int id) {
this.future = future; this.future = future;
this.callTimeout = callTimeout; this.operationTimeout = operationTimeout;
this.retryingCaller = retryingCallerFactory.<T>newCaller(); this.retryingCaller = retryingCallerFactory.<T> newCaller(rpcTimeout);
this.replicaId = id; this.replicaId = id;
} }
@ -77,7 +78,7 @@ public class ResultBoundedCompletionService<V> {
public void run() { public void run() {
try { try {
if (!cancelled) { if (!cancelled) {
result = this.retryingCaller.callWithRetries(future, callTimeout); result = this.retryingCaller.callWithRetries(future, operationTimeout);
resultObtained = true; resultObtained = true;
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -165,10 +166,10 @@ public class ResultBoundedCompletionService<V> {
this.completedTasks = new ArrayList<>(maxTasks); this.completedTasks = new ArrayList<>(maxTasks);
} }
public void submit(RetryingCallable<V> task, int rpcTimeout, int operationTimeout, int id) {
public void submit(RetryingCallable<V> task, int callTimeout, int id) { QueueingFuture<V> newFuture = new QueueingFuture<>(task, rpcTimeout, operationTimeout, id);
QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id); // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
executor.execute(TraceUtil.wrap(newFuture, "ResultBoundedCompletionService.submit")); executor.execute(newFuture);
tasks[id] = newFuture; tasks[id] = newFuture;
} }

View File

@ -39,11 +39,11 @@ public class ReversedClientScanner extends ClientScanner {
* {@link Scan}'s start row maybe changed. * {@link Scan}'s start row maybe changed.
*/ */
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
throws IOException { int scannerTimeout, int primaryOperationTimeout) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout); scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout);
} }
@Override @Override

View File

@ -314,7 +314,7 @@ public class RpcRetryingCallerWithReadReplicas {
for (int id = min; id <= max; id++) { for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id); HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
cs.submit(callOnReplica, operationTimeout, id); cs.submit(callOnReplica, rpcTimeout, operationTimeout, id);
} }
} }

View File

@ -68,15 +68,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private final RpcRetryingCaller<Result[]> caller; private final RpcRetryingCaller<Result[]> caller;
private final TableName tableName; private final TableName tableName;
private Configuration conf; private Configuration conf;
private int scannerTimeout; private final int scannerTimeout;
private final int readRpcTimeout;
private Set<ScannerCallable> outstandingCallables = new HashSet<>(); private Set<ScannerCallable> outstandingCallables = new HashSet<>();
private boolean someRPCcancelled = false; //required for testing purposes only private boolean someRPCcancelled = false; //required for testing purposes only
private int regionReplication = 0; private int regionReplication = 0;
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int scannerTimeout, int caching, Configuration conf, int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result []> caller) { RpcRetryingCaller<Result[]> caller) {
this.currentScannerCallable = baseCallable; this.currentScannerCallable = baseCallable;
this.cConnection = cConnection; this.cConnection = cConnection;
this.pool = pool; this.pool = pool;
@ -88,6 +89,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
this.retries = retries; this.retries = retries;
this.tableName = tableName; this.tableName = tableName;
this.conf = conf; this.conf = conf;
this.readRpcTimeout = readRpcTimeout;
this.scannerTimeout = scannerTimeout; this.scannerTimeout = scannerTimeout;
this.caller = caller; this.caller = caller;
} }
@ -326,7 +328,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) { ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable); outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id);
} }
private void addCallsForOtherReplicas( private void addCallsForOtherReplicas(
@ -340,7 +342,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
setStartRowForReplicaCallable(s); setStartRowForReplicaCallable(s);
outstandingCallables.add(s); outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s); RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica, scannerTimeout, id); cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
} }
} }

View File

@ -35,6 +35,8 @@ abstract class TableBuilderBase implements TableBuilder {
protected int readRpcTimeout; protected int readRpcTimeout;
protected int writeRpcTimeout; protected int writeRpcTimeout;
protected final int scanReadRpcTimeout;
protected int scanTimeout;
TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) { TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) {
if (tableName == null) { if (tableName == null) {
@ -45,6 +47,10 @@ abstract class TableBuilderBase implements TableBuilder {
: connConf.getOperationTimeout(); : connConf.getOperationTimeout();
this.rpcTimeout = connConf.getRpcTimeout(); this.rpcTimeout = connConf.getRpcTimeout();
this.readRpcTimeout = connConf.getReadRpcTimeout(); this.readRpcTimeout = connConf.getReadRpcTimeout();
this.scanReadRpcTimeout =
tableName.isSystemTable() ? connConf.getMetaReadRpcTimeout() : readRpcTimeout;
this.scanTimeout =
tableName.isSystemTable() ? connConf.getMetaScanTimeout() : connConf.getScanTimeout();
this.writeRpcTimeout = connConf.getWriteRpcTimeout(); this.writeRpcTimeout = connConf.getWriteRpcTimeout();
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
@ -107,7 +108,8 @@ public class TestClientScanner {
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException { throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout); HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout);
} }
@Override @Override
@ -495,7 +497,7 @@ public class TestClientScanner {
} }
@Override @Override
public <T> RpcRetryingCaller<T> newCaller() { public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
return new RpcRetryingCaller<T>() { return new RpcRetryingCaller<T>() {
@Override @Override
public void cancel() { public void cancel() {

View File

@ -0,0 +1,488 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@Category({ MediumTests.class, ClientTests.class })
public class TestClientScannerTimeouts {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestClientScannerTimeouts.class);
private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static AsyncConnection ASYNC_CONN;
private static Connection CONN;
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static final byte[] VALUE = Bytes.toBytes("testValue");
private static final byte[] ROW0 = Bytes.toBytes("row-0");
private static final byte[] ROW1 = Bytes.toBytes("row-1");
private static final byte[] ROW2 = Bytes.toBytes("row-2");
private static final byte[] ROW3 = Bytes.toBytes("row-3");
private static final int rpcTimeout = 1000;
private static final int scanTimeout = 3 * rpcTimeout;
private static final int metaScanTimeout = 6 * rpcTimeout;
private static final int CLIENT_RETRIES_NUMBER = 3;
private static TableName tableName;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Don't report so often so easier to see other rpcs
conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
TEST_UTIL.startMiniCluster(1);
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout);
conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout);
conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get();
CONN = ConnectionFactory.createConnection(conf);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
CONN.close();
ASYNC_CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
public void setup(boolean isSystemTable) throws IOException {
RSRpcServicesWithScanTimeout.reset();
String nameAsString = name.getMethodName();
if (isSystemTable) {
nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString;
}
tableName = TableName.valueOf(nameAsString);
TEST_UTIL.createTable(tableName, FAMILY);
Table table = CONN.getTable(tableName);
putToTable(table, ROW0);
putToTable(table, ROW1);
putToTable(table, ROW2);
putToTable(table, ROW3);
LOG.info("Wrote our four values");
table.getRegionLocator().getAllRegionLocations();
// reset again incase the creation/population caused anything to trigger
RSRpcServicesWithScanTimeout.reset();
}
private void expectRow(byte[] expected, Result result) {
assertTrue("Expected row: " + Bytes.toString(expected),
Bytes.equals(expected, result.getRow()));
}
private void expectNumTries(int expected) {
assertEquals(
"Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber,
expected, RSRpcServicesWithScanTimeout.tryNumber);
// reset for next
RSRpcServicesWithScanTimeout.tryNumber = 0;
}
/**
* verify that we don't miss any data when encountering an OutOfOrderScannerNextException.
* Typically, the only way to naturally trigger this is if a client-side timeout causes an
* erroneous next() call. This is relatively hard to do these days because the server attempts to
* always return before the timeout. In this test we force the server to throw this exception, so
* that we can test the retry logic appropriately.
*/
@Test
public void testRetryOutOfOrderScannerNextException() throws IOException {
expectRetryOutOfOrderScannerNext(() -> getScanner(CONN));
}
/**
* AsyncTable version of above
*/
@Test
public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException {
expectRetryOutOfOrderScannerNext(this::getAsyncScanner);
}
/**
* verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for normal scans. Use a
* special connection which has retries disabled, because otherwise the scanner will retry the
* timed out next() call and mess up the test.
*/
@Test
public void testNormalScanTimeoutOnNext() throws IOException {
setup(false);
// Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and
// will retry until scannerTimeout. This makes it more difficult to test the timeouts
// of normal next() calls. So we use a separate connection here which has retries disabled.
Configuration confNoRetries = new Configuration(CONN.getConfiguration());
confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) {
expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn));
}
}
/**
* AsyncTable version of above
*/
@Test
public void testNormalScanTimeoutOnNextAsync() throws IOException {
setup(false);
expectTimeoutOnNext(scanTimeout, this::getAsyncScanner);
}
/**
* verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for
* meta scans
*/
@Test
public void testNormalScanTimeoutOnOpenScanner() throws IOException {
setup(false);
expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner);
}
/**
* AsyncTable version of above
*/
@Test
public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException {
setup(false);
expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner);
}
/**
* verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for
* next() calls in meta scans
*/
@Test
public void testMetaScanTimeoutOnNext() throws IOException {
setup(true);
expectTimeoutOnNext(metaScanTimeout, this::getScanner);
}
/**
* AsyncTable version of above
*/
@Test
public void testMetaScanTimeoutOnNextAsync() throws IOException {
setup(true);
expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner);
}
/**
* verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for
* openScanner() calls for meta scans
*/
@Test
public void testMetaScanTimeoutOnOpenScanner() throws IOException {
setup(true);
expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner);
}
/**
* AsyncTable version of above
*/
@Test
public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException {
setup(true);
expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner);
}
private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSupplier)
throws IOException {
setup(false);
RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1;
LOG.info(
"Opening scanner, expecting no errors from first next() call from openScanner response");
ResultScanner scanner = scannerSupplier.get();
Result result = scanner.next();
expectRow(ROW0, result);
expectNumTries(0);
LOG.info("Making first next() RPC, expecting no errors for seqNo 0");
result = scanner.next();
expectRow(ROW1, result);
expectNumTries(0);
LOG.info(
"Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry");
result = scanner.next();
expectRow(ROW2, result);
expectNumTries(1);
// reset so no errors. since last call restarted the scan and following
// call would otherwise fail
RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1;
LOG.info("Finishing scan, expecting no errors");
result = scanner.next();
expectRow(ROW3, result);
scanner.close();
LOG.info("Testing always throw exception");
byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 };
int i = 0;
// test the case that RPC always throws
scanner = scannerSupplier.get();
RSRpcServicesWithScanTimeout.throwAlways = true;
while (true) {
LOG.info("Calling scanner.next()");
result = scanner.next();
if (result == null) {
break;
} else {
byte[] expectedResult = expectedResults[i++];
expectRow(expectedResult, result);
}
}
// ensure we verified all rows. this along with the expectRow check above
// proves that we didn't miss any rows.
assertEquals("Expected to exhaust expectedResults array length=" + expectedResults.length
+ ", actual index=" + i, expectedResults.length, i);
// expect all but the first row (which came from initial openScanner) to have thrown an error
expectNumTries(expectedResults.length - 1);
}
private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> scannerSupplier)
throws IOException {
RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
LOG.info(
"Opening scanner, expecting no timeouts from first next() call from openScanner response");
ResultScanner scanner = scannerSupplier.get();
Result result = scanner.next();
expectRow(ROW0, result);
LOG.info("Making first next() RPC, expecting no timeout for seqNo 0");
result = scanner.next();
expectRow(ROW1, result);
LOG.info("Making second next() RPC, expecting timeout");
long start = System.nanoTime();
try {
scanner.next();
fail("Expected CallTimeoutException");
} catch (RetriesExhaustedException e) {
assertTrue("Expected CallTimeoutException", e.getCause() instanceof CallTimeoutException
|| e.getCause() instanceof SocketTimeoutException);
}
expectTimeout(start, timeout);
}
private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> scannerSupplier)
throws IOException {
RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
RSRpcServicesWithScanTimeout.sleepOnOpen = true;
LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response");
long start = System.nanoTime();
try {
scannerSupplier.get().next();
fail("Expected SocketTimeoutException or CallTimeoutException");
} catch (RetriesExhaustedException e) {
LOG.info("Got error", e);
assertTrue("Expected SocketTimeoutException or CallTimeoutException, but was " + e.getCause(),
e.getCause() instanceof CallTimeoutException
|| e.getCause() instanceof SocketTimeoutException);
}
expectTimeout(start, timeout);
}
private void expectTimeout(long start, int timeout) {
long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
LOG.info("Expected duration >= {}, and got {}", timeout, duration);
assertTrue("Expected duration >= " + timeout + ", but was " + duration, duration >= timeout);
}
private ResultScanner getScanner() {
return getScanner(CONN);
}
private ResultScanner getScanner(Connection conn) {
Scan scan = new Scan();
scan.setCaching(1);
try {
return conn.getTable(tableName).getScanner(scan);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private ResultScanner getAsyncScanner() {
Scan scan = new Scan();
scan.setCaching(1);
return ASYNC_CONN.getTable(tableName).getScanner(scan);
}
private void putToTable(Table ht, byte[] rowkey) throws IOException {
Put put = new Put(rowkey);
put.addColumn(FAMILY, QUALIFIER, VALUE);
ht.put(put);
}
private static class RegionServerWithScanTimeout
extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public RegionServerWithScanTimeout(Configuration conf)
throws IOException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new RSRpcServicesWithScanTimeout(this);
}
}
private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
private long tableScannerId;
private static long seqNoToThrowOn = -1;
private static boolean throwAlways = false;
private static boolean threw;
private static long seqNoToSleepOn = -1;
private static boolean sleepOnOpen = false;
private static volatile boolean slept;
private static int tryNumber = 0;
private static int sleepTime = rpcTimeout + 500;
public static void setSleepForTimeout(int timeout) {
sleepTime = timeout + 500;
}
public static void reset() {
setSleepForTimeout(scanTimeout);
seqNoToSleepOn = -1;
seqNoToThrowOn = -1;
throwAlways = false;
threw = false;
sleepOnOpen = false;
slept = false;
tryNumber = 0;
}
public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public ScanResponse scan(final RpcController controller, final ScanRequest request)
throws ServiceException {
if (request.hasScannerId()) {
LOG.info("Got request {}", request);
ScanResponse scanResponse = super.scan(controller, request);
if (tableScannerId != request.getScannerId() || request.getCloseScanner()) {
return scanResponse;
}
if (
throwAlways
|| (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq())
) {
threw = true;
tryNumber++;
LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber,
tableScannerId);
throw new ServiceException(new OutOfOrderScannerNextException());
}
if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) {
try {
LOG.info("SLEEPING " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
slept = true;
tryNumber++;
}
return scanResponse;
} else {
ScanResponse scanRes = super.scan(controller, request);
String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
tableScannerId = scanRes.getScannerId();
if (sleepOnOpen) {
try {
LOG.info("openScanner SLEEPING " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
}
return scanRes;
}
}
}
}

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -91,6 +93,7 @@ public class TestScannerHeartbeatMessages {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Table TABLE = null; private static Table TABLE = null;
private static Connection CONN = null;
/** /**
* Table configuration * Table configuration
@ -134,6 +137,7 @@ public class TestScannerHeartbeatMessages {
conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
// setting these here for usage on the server side. will override for client side below
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
@ -142,6 +146,10 @@ public class TestScannerHeartbeatMessages {
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
// set client timeout for client side, we want it to be less than server side.
Configuration clientConf = new Configuration(conf);
clientConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
CONN = ConnectionFactory.createConnection(clientConf);
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
} }
@ -180,11 +188,11 @@ public class TestScannerHeartbeatMessages {
} }
static Table createTestTable(TableName name, byte[][] rows, byte[][] families, static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
byte[][] qualifiers, byte[] cellValue) throws IOException { byte[][] qualifiers, byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families); TEST_UTIL.createTable(name, families);
Table ht = CONN.getTable(name);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue); List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts); ht.put(puts);
ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
return ht; return ht;
} }
@ -212,6 +220,7 @@ public class TestScannerHeartbeatMessages {
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
CONN.close();
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }