HBASE-27078 Allow configuring a separate timeout for meta scans (#4585)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Bryan Beaudreault 2022-07-07 16:33:16 -04:00
parent 3ca8484c56
commit 94fc45ef76
17 changed files with 615 additions and 242 deletions

View File

@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PE
import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@ -57,6 +59,9 @@ class AsyncConnectionConfiguration {
// timeout for each read rpc request
private final long readRpcTimeoutNs;
// timeout for each read rpc request against system tables
private final long metaReadRpcTimeoutNs;
// timeout for each write rpc request
private final long writeRpcTimeoutNs;
@ -74,6 +79,7 @@ class AsyncConnectionConfiguration {
// client that it is still alive. The scan timeout is used as operation timeout for every
// operations in a scan, such as openScanner or next.
private final long scanTimeoutNs;
private final long metaScanTimeoutNs;
private final int scannerCaching;
@ -111,8 +117,11 @@ class AsyncConnectionConfiguration {
TimeUnit.MILLISECONDS.toNanos(connectionConf.getMetaOperationTimeout());
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getOperationTimeout());
this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getRpcTimeout());
this.readRpcTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, connectionConf.getReadRpcTimeout()));
long readRpcTimeoutMillis =
conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, connectionConf.getRpcTimeout());
this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis);
this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis));
this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, connectionConf.getWriteRpcTimeout()));
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getPauseMillis());
@ -124,8 +133,11 @@ class AsyncConnectionConfiguration {
TimeUnit.MICROSECONDS.toNanos(connectionConf.getReplicaCallTimeoutMicroSecondScan());
this.primaryMetaScanTimeoutNs =
TimeUnit.MICROSECONDS.toNanos(connectionConf.getMetaReplicaCallTimeoutMicroSecondScan());
this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf
.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis);
this.metaScanTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis));
// fields not in connection configuration
this.startLogErrorsCnt =
@ -150,6 +162,10 @@ class AsyncConnectionConfiguration {
return readRpcTimeoutNs;
}
long getMetaReadRpcTimeoutNs() {
return metaReadRpcTimeoutNs;
}
long getWriteRpcTimeoutNs() {
return writeRpcTimeoutNs;
}
@ -174,6 +190,10 @@ class AsyncConnectionConfiguration {
return scanTimeoutNs;
}
long getMetaScanTimeoutNs() {
return metaScanTimeoutNs;
}
int getScannerCaching() {
return scannerCaching;
}

View File

@ -55,9 +55,12 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
this.operationTimeoutNs = tableName.isSystemTable()
? connConf.getMetaOperationTimeoutNs()
: connConf.getOperationTimeoutNs();
this.scanTimeoutNs = connConf.getScanTimeoutNs();
this.scanTimeoutNs =
tableName.isSystemTable() ? connConf.getMetaScanTimeoutNs() : connConf.getScanTimeoutNs();
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
this.readRpcTimeoutNs = tableName.isSystemTable()
? connConf.getMetaReadRpcTimeoutNs()
: connConf.getReadRpcTimeoutNs();
this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();

View File

@ -63,10 +63,10 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool,
int replicaCallTimeoutMicroSecondScan) throws IOException {
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan);
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan);
exceptionsQueue = new ConcurrentLinkedQueue<>();
final Context context = Context.current();
final Runnable runnable = context.wrap(new PrefetchRunnable());

View File

@ -75,6 +75,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected final long maxScannerResultSize;
private final ClusterConnection connection;
protected final TableName tableName;
protected final int readRpcTimeout;
protected final int scannerTimeout;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result[]> caller;
@ -100,8 +101,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
*/
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@ -120,8 +121,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
}
this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.readRpcTimeout = scanReadRpcTimeout;
this.scannerTimeout = scannerTimeout;
// check if application wants to collect scan metrics
initScanMetrics(scan);
@ -248,9 +249,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
// clear the current region, we will set a new value to it after the first call of the new
// callable.
this.currentRegion = null;
this.callable =
new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool,
primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller);
this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(),
createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout,
scannerTimeout, caching, conf, caller);
this.callable.setCaching(this.caching);
incRegionCountMetrics(scanMetrics);
return true;

View File

@ -36,10 +36,10 @@ import org.apache.yetus.audience.InterfaceAudience;
public class ClientSimpleScanner extends ClientScanner {
public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool,
int replicaCallTimeoutMicroSecondScan) throws IOException {
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan);
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan);
}
@Override

View File

@ -71,6 +71,11 @@ public class ConnectionConfiguration {
HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED);
}
public static final String HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY =
"hbase.client.meta.read.rpc.timeout";
public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
"hbase.client.meta.scanner.timeout.period";
private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
private final long writeBufferPeriodicFlushTimerTickMs;
@ -85,7 +90,11 @@ public class ConnectionConfiguration {
private final int maxKeyValueSize;
private final int rpcTimeout;
private final int readRpcTimeout;
private final int metaReadRpcTimeout;
private final int writeRpcTimeout;
private final int scanTimeout;
private final int metaScanTimeout;
// toggle for async/sync prefetch
private final boolean clientScannerAsyncPrefetch;
private final long pauseMs;
@ -140,9 +149,16 @@ public class ConnectionConfiguration {
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.metaReadRpcTimeout = conf.getInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeout);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.scanTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout);
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
@ -178,8 +194,11 @@ public class ConnectionConfiguration {
this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.scanTimeout = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
this.metaScanTimeout = scanTimeout;
this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
}
@ -188,6 +207,10 @@ public class ConnectionConfiguration {
return readRpcTimeout;
}
public int getMetaReadRpcTimeout() {
return metaReadRpcTimeout;
}
public int getWriteRpcTimeout() {
return writeRpcTimeout;
}
@ -248,6 +271,14 @@ public class ConnectionConfiguration {
return rpcTimeout;
}
public int getScanTimeout() {
return scanTimeout;
}
public int getMetaScanTimeout() {
return metaScanTimeout;
}
public long getPauseMillis() {
return pauseMs;
}

View File

@ -960,7 +960,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
try (Scope ignored = span.makeCurrent();
ReversedClientScanner rcs =
new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(),
connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond)) {
boolean tableNotFound = true;
for (;;) {
Result regionInfoRow = rcs.next();
@ -2206,7 +2207,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
}
}
@Override
public String getClusterId() {
try {

View File

@ -120,6 +120,9 @@ public class HTable implements Table {
private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX
private int readRpcTimeoutMs; // timeout for each read rpc request
private int writeRpcTimeoutMs; // timeout for each write rpc request
private final int scanReadRpcTimeout;
private final int scanTimeout;
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final HRegionLocator locator;
@ -191,6 +194,8 @@ public class HTable implements Table {
this.rpcTimeoutMs = builder.rpcTimeout;
this.readRpcTimeoutMs = builder.readRpcTimeout;
this.writeRpcTimeoutMs = builder.writeRpcTimeout;
this.scanReadRpcTimeout = builder.scanReadRpcTimeout;
this.scanTimeout = builder.scanTimeout;
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
@ -312,18 +317,21 @@ public class HTable implements Table {
final boolean async = scan.isAsyncPrefetch() != null
? scan.isAsyncPrefetch()
: connConfiguration.isClientScannerAsyncPrefetch();
final int timeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan();
final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan();
if (scan.isReversed()) {
return new ReversedClientScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, timeout);
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout);
} else {
if (async) {
return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, timeout);
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout);
} else {
return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, timeout);
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout);
}
}
}

View File

@ -53,15 +53,16 @@ public class ResultBoundedCompletionService<V> {
private T result = null;
private ExecutionException exeEx = null;
private volatile boolean cancelled = false;
private final int callTimeout;
private final int operationTimeout;
private final RpcRetryingCaller<T> retryingCaller;
private boolean resultObtained = false;
private final int replicaId; // replica id
public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) {
public QueueingFuture(RetryingCallable<T> future, int rpcTimeout, int operationTimeout,
int id) {
this.future = future;
this.callTimeout = callTimeout;
this.retryingCaller = retryingCallerFactory.<T> newCaller();
this.operationTimeout = operationTimeout;
this.retryingCaller = retryingCallerFactory.<T> newCaller(rpcTimeout);
this.replicaId = id;
}
@ -70,7 +71,7 @@ public class ResultBoundedCompletionService<V> {
public void run() {
try {
if (!cancelled) {
result = this.retryingCaller.callWithRetries(future, callTimeout);
result = this.retryingCaller.callWithRetries(future, operationTimeout);
resultObtained = true;
}
} catch (Throwable t) {
@ -157,8 +158,8 @@ public class ResultBoundedCompletionService<V> {
this.completedTasks = new ArrayList<>(maxTasks);
}
public void submit(RetryingCallable<V> task, int callTimeout, int id) {
QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id);
public void submit(RetryingCallable<V> task, int rpcTimeout, int operationTimeout, int id) {
QueueingFuture<V> newFuture = new QueueingFuture<>(task, rpcTimeout, operationTimeout, id);
// remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
executor.execute(newFuture);
tasks[id] = newFuture;

View File

@ -38,10 +38,10 @@ public class ReversedClientScanner extends ClientScanner {
*/
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout);
}
@Override

View File

@ -305,7 +305,7 @@ public class RpcRetryingCallerWithReadReplicas {
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
cs.submit(callOnReplica, operationTimeout, id);
cs.submit(callOnReplica, rpcTimeout, operationTimeout, id);
}
}

View File

@ -64,14 +64,15 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private final RpcRetryingCaller<Result[]> caller;
private final TableName tableName;
private Configuration conf;
private int scannerTimeout;
private final int scannerTimeout;
private final int readRpcTimeout;
private Set<ScannerCallable> outstandingCallables = new HashSet<>();
private boolean someRPCcancelled = false; // required for testing purposes only
private int regionReplication = 0;
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int scannerTimeout, int caching, Configuration conf,
int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result[]> caller) {
this.currentScannerCallable = baseCallable;
this.cConnection = cConnection;
@ -84,6 +85,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
this.retries = retries;
this.tableName = tableName;
this.conf = conf;
this.readRpcTimeout = readRpcTimeout;
this.scannerTimeout = scannerTimeout;
this.caller = caller;
}
@ -323,7 +325,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id);
}
private void addCallsForOtherReplicas(
@ -337,7 +339,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
setStartRowForReplicaCallable(s);
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica, scannerTimeout, id);
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
}
}

View File

@ -35,6 +35,8 @@ abstract class TableBuilderBase implements TableBuilder {
protected int readRpcTimeout;
protected int writeRpcTimeout;
protected final int scanReadRpcTimeout;
protected int scanTimeout;
TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) {
if (tableName == null) {
@ -46,6 +48,10 @@ abstract class TableBuilderBase implements TableBuilder {
: connConf.getOperationTimeout();
this.rpcTimeout = connConf.getRpcTimeout();
this.readRpcTimeout = connConf.getReadRpcTimeout();
this.scanReadRpcTimeout =
tableName.isSystemTable() ? connConf.getMetaReadRpcTimeout() : readRpcTimeout;
this.scanTimeout =
tableName.isSystemTable() ? connConf.getMetaScanTimeout() : connConf.getScanTimeout();
this.writeRpcTimeout = connConf.getWriteRpcTimeout();
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.RegionLocations;
@ -107,7 +108,8 @@ public class TestClientScanner {
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout);
}
@Override
@ -500,7 +502,7 @@ public class TestClientScanner {
}
@Override
public <T> RpcRetryingCaller<T> newCaller() {
public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
return new RpcRetryingCaller<T>() {
@Override
public void cancel() {

View File

@ -1,198 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
/**
* Test the scenario where a HRegionServer#scan() call, while scanning, timeout at client side and
* getting retried. This scenario should not result in some data being skipped at RS side.
*/
@Category({ MediumTests.class, ClientTests.class })
public class TestClientScannerRPCTimeout {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestClientScannerRPCTimeout.class);
private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerRPCTimeout.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static final byte[] VALUE = Bytes.toBytes("testValue");
private static final int rpcTimeout = 2 * 1000;
private static final int CLIENT_RETRIES_NUMBER = 3;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Don't report so often so easier to see other rpcs
conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testScannerNextRPCTimesout() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
Table ht = TEST_UTIL.createTable(tableName, FAMILY);
byte[] r0 = Bytes.toBytes("row-0");
byte[] r1 = Bytes.toBytes("row-1");
byte[] r2 = Bytes.toBytes("row-2");
byte[] r3 = Bytes.toBytes("row-3");
putToTable(ht, r0);
putToTable(ht, r1);
putToTable(ht, r2);
putToTable(ht, r3);
LOG.info("Wrote our three values");
RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
Scan scan = new Scan();
scan.setCaching(1);
ResultScanner scanner = ht.getScanner(scan);
Result result = scanner.next();
// fetched when openScanner
assertTrue("Expected row: row-0", Bytes.equals(r0, result.getRow()));
result = scanner.next();
assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow()));
LOG.info("Got expected first row");
long t1 = EnvironmentEdgeManager.currentTime();
result = scanner.next();
assertTrue((EnvironmentEdgeManager.currentTime() - t1) > rpcTimeout);
assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow()));
RSRpcServicesWithScanTimeout.seqNoToSleepOn = -1;// No need of sleep
result = scanner.next();
assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
scanner.close();
// test the case that RPC is always timesout
scanner = ht.getScanner(scan);
RSRpcServicesWithScanTimeout.sleepAlways = true;
RSRpcServicesWithScanTimeout.tryNumber = 0;
try {
result = scanner.next();
} catch (IOException ioe) {
// catch the exception after max retry number
LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe);
}
assertTrue(
"Expected maximal try number=" + CLIENT_RETRIES_NUMBER + ", actual ="
+ RSRpcServicesWithScanTimeout.tryNumber,
RSRpcServicesWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER);
}
private void putToTable(Table ht, byte[] rowkey) throws IOException {
Put put = new Put(rowkey);
put.addColumn(FAMILY, QUALIFIER, VALUE);
ht.put(put);
}
private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
public RegionServerWithScanTimeout(Configuration conf)
throws IOException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new RSRpcServicesWithScanTimeout(this);
}
}
private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
private long tableScannerId;
private boolean slept;
private static long seqNoToSleepOn = -1;
private static boolean sleepAlways = false;
private static int tryNumber = 0;
public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public ScanResponse scan(final RpcController controller, final ScanRequest request)
throws ServiceException {
if (request.hasScannerId()) {
ScanResponse scanResponse = super.scan(controller, request);
if (
this.tableScannerId == request.getScannerId()
&& (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))
) {
try {
LOG.info("SLEEPING " + (rpcTimeout + 500));
Thread.sleep(rpcTimeout + 500);
} catch (InterruptedException e) {
}
slept = true;
tryNumber++;
if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) {
sleepAlways = false;
}
}
return scanResponse;
} else {
ScanResponse scanRes = super.scan(controller, request);
String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
tableScannerId = scanRes.getScannerId();
}
return scanRes;
}
}
}
}

View File

@ -0,0 +1,488 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@Category({ MediumTests.class, ClientTests.class })
public class TestClientScannerTimeouts {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestClientScannerTimeouts.class);
private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static AsyncConnection ASYNC_CONN;
private static Connection CONN;
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static final byte[] VALUE = Bytes.toBytes("testValue");
private static final byte[] ROW0 = Bytes.toBytes("row-0");
private static final byte[] ROW1 = Bytes.toBytes("row-1");
private static final byte[] ROW2 = Bytes.toBytes("row-2");
private static final byte[] ROW3 = Bytes.toBytes("row-3");
private static final int rpcTimeout = 1000;
private static final int scanTimeout = 3 * rpcTimeout;
private static final int metaScanTimeout = 6 * rpcTimeout;
private static final int CLIENT_RETRIES_NUMBER = 3;
private static TableName tableName;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Don't report so often so easier to see other rpcs
conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
TEST_UTIL.startMiniCluster(1);
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout);
conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout);
conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get();
CONN = ConnectionFactory.createConnection(conf);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
CONN.close();
ASYNC_CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
public void setup(boolean isSystemTable) throws IOException {
RSRpcServicesWithScanTimeout.reset();
String nameAsString = name.getMethodName();
if (isSystemTable) {
nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString;
}
tableName = TableName.valueOf(nameAsString);
TEST_UTIL.createTable(tableName, FAMILY);
Table table = CONN.getTable(tableName);
putToTable(table, ROW0);
putToTable(table, ROW1);
putToTable(table, ROW2);
putToTable(table, ROW3);
LOG.info("Wrote our four values");
table.getRegionLocator().getAllRegionLocations();
// reset again incase the creation/population caused anything to trigger
RSRpcServicesWithScanTimeout.reset();
}
private void expectRow(byte[] expected, Result result) {
assertTrue("Expected row: " + Bytes.toString(expected),
Bytes.equals(expected, result.getRow()));
}
private void expectNumTries(int expected) {
assertEquals(
"Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber,
expected, RSRpcServicesWithScanTimeout.tryNumber);
// reset for next
RSRpcServicesWithScanTimeout.tryNumber = 0;
}
/**
* verify that we don't miss any data when encountering an OutOfOrderScannerNextException.
* Typically, the only way to naturally trigger this is if a client-side timeout causes an
* erroneous next() call. This is relatively hard to do these days because the server attempts to
* always return before the timeout. In this test we force the server to throw this exception, so
* that we can test the retry logic appropriately.
*/
@Test
public void testRetryOutOfOrderScannerNextException() throws IOException {
expectRetryOutOfOrderScannerNext(() -> getScanner(CONN));
}
/**
* AsyncTable version of above
*/
@Test
public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException {
expectRetryOutOfOrderScannerNext(this::getAsyncScanner);
}
/**
* verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for normal scans. Use a
* special connection which has retries disabled, because otherwise the scanner will retry the
* timed out next() call and mess up the test.
*/
@Test
public void testNormalScanTimeoutOnNext() throws IOException {
setup(false);
// Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and
// will retry until scannerTimeout. This makes it more difficult to test the timeouts
// of normal next() calls. So we use a separate connection here which has retries disabled.
Configuration confNoRetries = new Configuration(CONN.getConfiguration());
confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) {
expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn));
}
}
/**
* AsyncTable version of above
*/
@Test
public void testNormalScanTimeoutOnNextAsync() throws IOException {
setup(false);
expectTimeoutOnNext(scanTimeout, this::getAsyncScanner);
}
/**
* verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for
* meta scans
*/
@Test
public void testNormalScanTimeoutOnOpenScanner() throws IOException {
setup(false);
expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner);
}
/**
* AsyncTable version of above
*/
@Test
public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException {
setup(false);
expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner);
}
/**
* verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for
* next() calls in meta scans
*/
@Test
public void testMetaScanTimeoutOnNext() throws IOException {
setup(true);
expectTimeoutOnNext(metaScanTimeout, this::getScanner);
}
/**
* AsyncTable version of above
*/
@Test
public void testMetaScanTimeoutOnNextAsync() throws IOException {
setup(true);
expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner);
}
/**
* verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for
* openScanner() calls for meta scans
*/
@Test
public void testMetaScanTimeoutOnOpenScanner() throws IOException {
setup(true);
expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner);
}
/**
* AsyncTable version of above
*/
@Test
public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException {
setup(true);
expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner);
}
private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSupplier)
throws IOException {
setup(false);
RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1;
LOG.info(
"Opening scanner, expecting no errors from first next() call from openScanner response");
ResultScanner scanner = scannerSupplier.get();
Result result = scanner.next();
expectRow(ROW0, result);
expectNumTries(0);
LOG.info("Making first next() RPC, expecting no errors for seqNo 0");
result = scanner.next();
expectRow(ROW1, result);
expectNumTries(0);
LOG.info(
"Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry");
result = scanner.next();
expectRow(ROW2, result);
expectNumTries(1);
// reset so no errors. since last call restarted the scan and following
// call would otherwise fail
RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1;
LOG.info("Finishing scan, expecting no errors");
result = scanner.next();
expectRow(ROW3, result);
scanner.close();
LOG.info("Testing always throw exception");
byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 };
int i = 0;
// test the case that RPC always throws
scanner = scannerSupplier.get();
RSRpcServicesWithScanTimeout.throwAlways = true;
while (true) {
LOG.info("Calling scanner.next()");
result = scanner.next();
if (result == null) {
break;
} else {
byte[] expectedResult = expectedResults[i++];
expectRow(expectedResult, result);
}
}
// ensure we verified all rows. this along with the expectRow check above
// proves that we didn't miss any rows.
assertEquals("Expected to exhaust expectedResults array length=" + expectedResults.length
+ ", actual index=" + i, expectedResults.length, i);
// expect all but the first row (which came from initial openScanner) to have thrown an error
expectNumTries(expectedResults.length - 1);
}
private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> scannerSupplier)
throws IOException {
RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
LOG.info(
"Opening scanner, expecting no timeouts from first next() call from openScanner response");
ResultScanner scanner = scannerSupplier.get();
Result result = scanner.next();
expectRow(ROW0, result);
LOG.info("Making first next() RPC, expecting no timeout for seqNo 0");
result = scanner.next();
expectRow(ROW1, result);
LOG.info("Making second next() RPC, expecting timeout");
long start = System.nanoTime();
try {
scanner.next();
fail("Expected CallTimeoutException");
} catch (RetriesExhaustedException e) {
assertTrue("Expected CallTimeoutException", e.getCause() instanceof CallTimeoutException
|| e.getCause() instanceof SocketTimeoutException);
}
expectTimeout(start, timeout);
}
private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> scannerSupplier)
throws IOException {
RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
RSRpcServicesWithScanTimeout.sleepOnOpen = true;
LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response");
long start = System.nanoTime();
try {
scannerSupplier.get().next();
fail("Expected SocketTimeoutException or CallTimeoutException");
} catch (RetriesExhaustedException e) {
LOG.info("Got error", e);
assertTrue("Expected SocketTimeoutException or CallTimeoutException, but was " + e.getCause(),
e.getCause() instanceof CallTimeoutException
|| e.getCause() instanceof SocketTimeoutException);
}
expectTimeout(start, timeout);
}
private void expectTimeout(long start, int timeout) {
long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
LOG.info("Expected duration >= {}, and got {}", timeout, duration);
assertTrue("Expected duration >= " + timeout + ", but was " + duration, duration >= timeout);
}
private ResultScanner getScanner() {
return getScanner(CONN);
}
private ResultScanner getScanner(Connection conn) {
Scan scan = new Scan();
scan.setCaching(1);
try {
return conn.getTable(tableName).getScanner(scan);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private ResultScanner getAsyncScanner() {
Scan scan = new Scan();
scan.setCaching(1);
return ASYNC_CONN.getTable(tableName).getScanner(scan);
}
private void putToTable(Table ht, byte[] rowkey) throws IOException {
Put put = new Put(rowkey);
put.addColumn(FAMILY, QUALIFIER, VALUE);
ht.put(put);
}
private static class RegionServerWithScanTimeout
extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public RegionServerWithScanTimeout(Configuration conf)
throws IOException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new RSRpcServicesWithScanTimeout(this);
}
}
private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
private long tableScannerId;
private static long seqNoToThrowOn = -1;
private static boolean throwAlways = false;
private static boolean threw;
private static long seqNoToSleepOn = -1;
private static boolean sleepOnOpen = false;
private static volatile boolean slept;
private static int tryNumber = 0;
private static int sleepTime = rpcTimeout + 500;
public static void setSleepForTimeout(int timeout) {
sleepTime = timeout + 500;
}
public static void reset() {
setSleepForTimeout(scanTimeout);
seqNoToSleepOn = -1;
seqNoToThrowOn = -1;
throwAlways = false;
threw = false;
sleepOnOpen = false;
slept = false;
tryNumber = 0;
}
public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public ScanResponse scan(final RpcController controller, final ScanRequest request)
throws ServiceException {
if (request.hasScannerId()) {
LOG.info("Got request {}", request);
ScanResponse scanResponse = super.scan(controller, request);
if (tableScannerId != request.getScannerId() || request.getCloseScanner()) {
return scanResponse;
}
if (
throwAlways
|| (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq())
) {
threw = true;
tryNumber++;
LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber,
tableScannerId);
throw new ServiceException(new OutOfOrderScannerNextException());
}
if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) {
try {
LOG.info("SLEEPING " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
slept = true;
tryNumber++;
}
return scanResponse;
} else {
ScanResponse scanRes = super.scan(controller, request);
String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
tableScannerId = scanRes.getScannerId();
if (sleepOnOpen) {
try {
LOG.info("openScanner SLEEPING " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
}
return scanRes;
}
}
}
}

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
@ -90,6 +92,7 @@ public class TestScannerHeartbeatMessages {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Table TABLE = null;
private static Connection CONN = null;
/**
* Table configuration
@ -133,6 +136,7 @@ public class TestScannerHeartbeatMessages {
conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
// setting these here for usage on the server side. will override for client side below
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
@ -141,6 +145,10 @@ public class TestScannerHeartbeatMessages {
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
TEST_UTIL.startMiniCluster(1);
// set client timeout for client side, we want it to be less than server side.
Configuration clientConf = new Configuration(conf);
clientConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
CONN = ConnectionFactory.createConnection(clientConf);
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
}
@ -180,10 +188,10 @@ public class TestScannerHeartbeatMessages {
static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
byte[][] qualifiers, byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families);
TEST_UTIL.createTable(name, families);
Table ht = CONN.getTable(name);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
return ht;
}
@ -211,6 +219,7 @@ public class TestScannerHeartbeatMessages {
@AfterClass
public static void tearDownAfterClass() throws Exception {
CONN.close();
TEST_UTIL.shutdownMiniCluster();
}