diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index ffccc127914..5fb95ebbd87 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PE import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -57,6 +59,9 @@ class AsyncConnectionConfiguration { // timeout for each read rpc request private final long readRpcTimeoutNs; + // timeout for each read rpc request against system tables + private final long metaReadRpcTimeoutNs; + // timeout for each write rpc request private final long writeRpcTimeoutNs; @@ -74,6 +79,7 @@ class AsyncConnectionConfiguration { // client that it is still alive. The scan timeout is used as operation timeout for every // operations in a scan, such as openScanner or next. private final long scanTimeoutNs; + private final long metaScanTimeoutNs; private final int scannerCaching; @@ -111,8 +117,11 @@ class AsyncConnectionConfiguration { TimeUnit.MILLISECONDS.toNanos(connectionConf.getMetaOperationTimeout()); this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getOperationTimeout()); this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getRpcTimeout()); - this.readRpcTimeoutNs = TimeUnit.MILLISECONDS - .toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, connectionConf.getReadRpcTimeout())); + long readRpcTimeoutMillis = + conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, connectionConf.getRpcTimeout()); + this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis); + this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis)); this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS .toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, connectionConf.getWriteRpcTimeout())); this.pauseNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getPauseMillis()); @@ -124,8 +133,11 @@ class AsyncConnectionConfiguration { TimeUnit.MICROSECONDS.toNanos(connectionConf.getReplicaCallTimeoutMicroSecondScan()); this.primaryMetaScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(connectionConf.getMetaReplicaCallTimeoutMicroSecondScan()); - this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf - .getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); + long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis); + this.metaScanTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis)); // fields not in connection configuration this.startLogErrorsCnt = @@ -150,6 +162,10 @@ class AsyncConnectionConfiguration { return readRpcTimeoutNs; } + long getMetaReadRpcTimeoutNs() { + return metaReadRpcTimeoutNs; + } + long getWriteRpcTimeoutNs() { return writeRpcTimeoutNs; } @@ -174,6 +190,10 @@ class AsyncConnectionConfiguration { return scanTimeoutNs; } + long getMetaScanTimeoutNs() { + return metaScanTimeoutNs; + } + int getScannerCaching() { return scannerCaching; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 7c58e8c672f..624d6e1dbb0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -55,9 +55,12 @@ abstract class AsyncTableBuilderBase this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs() : connConf.getOperationTimeoutNs(); - this.scanTimeoutNs = connConf.getScanTimeoutNs(); + this.scanTimeoutNs = + tableName.isSystemTable() ? connConf.getMetaScanTimeoutNs() : connConf.getScanTimeoutNs(); this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); - this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); + this.readRpcTimeoutNs = tableName.isSystemTable() + ? connConf.getMetaReadRpcTimeoutNs() + : connConf.getReadRpcTimeoutNs(); this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); this.pauseNs = connConf.getPauseNs(); this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index 928dd957e12..6c221590500 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -63,10 +63,10 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, - RpcControllerFactory rpcControllerFactory, ExecutorService pool, - int replicaCallTimeoutMicroSecondScan) throws IOException { + RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, + int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - replicaCallTimeoutMicroSecondScan); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan); exceptionsQueue = new ConcurrentLinkedQueue<>(); final Context context = Context.current(); final Runnable runnable = context.wrap(new PrefetchRunnable()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index dbc03fce1d5..1d656873bbe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -75,6 +75,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected final long maxScannerResultSize; private final ClusterConnection connection; protected final TableName tableName; + protected final int readRpcTimeout; protected final int scannerTimeout; protected boolean scanMetricsPublished = false; protected RpcRetryingCaller caller; @@ -100,8 +101,8 @@ public abstract class ClientScanner extends AbstractClientScanner { */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { + RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, + int scannerTimeout, int primaryOperationTimeout) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace( "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); @@ -120,8 +121,8 @@ public abstract class ClientScanner extends AbstractClientScanner { this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); } - this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + this.readRpcTimeout = scanReadRpcTimeout; + this.scannerTimeout = scannerTimeout; // check if application wants to collect scan metrics initScanMetrics(scan); @@ -248,9 +249,9 @@ public abstract class ClientScanner extends AbstractClientScanner { // clear the current region, we will set a new value to it after the first call of the new // callable. this.currentRegion = null; - this.callable = - new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, - primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); + this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(), + createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout, + scannerTimeout, caching, conf, caller); this.callable.setCaching(this.caching); incRegionCountMetrics(scanMetrics); return true; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index a1530c9bb7c..51d1bb1b228 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -36,10 +36,10 @@ import org.apache.yetus.audience.InterfaceAudience; public class ClientSimpleScanner extends ClientScanner { public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, - RpcControllerFactory rpcControllerFactory, ExecutorService pool, - int replicaCallTimeoutMicroSecondScan) throws IOException { + RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, + int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - replicaCallTimeoutMicroSecondScan); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index c8a283d0869..93fa2600d89 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -71,6 +71,11 @@ public class ConnectionConfiguration { HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); } + public static final String HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY = + "hbase.client.meta.read.rpc.timeout"; + public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT = + "hbase.client.meta.scanner.timeout.period"; + private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimerTickMs; @@ -85,7 +90,11 @@ public class ConnectionConfiguration { private final int maxKeyValueSize; private final int rpcTimeout; private final int readRpcTimeout; + private final int metaReadRpcTimeout; private final int writeRpcTimeout; + private final int scanTimeout; + private final int metaScanTimeout; + // toggle for async/sync prefetch private final boolean clientScannerAsyncPrefetch; private final long pauseMs; @@ -140,9 +149,16 @@ public class ConnectionConfiguration { this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.metaReadRpcTimeout = conf.getInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeout); + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.scanTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + + this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout); + long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); @@ -178,8 +194,11 @@ public class ConnectionConfiguration { this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.scanTimeout = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; + this.metaScanTimeout = scanTimeout; this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE; this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE; } @@ -188,6 +207,10 @@ public class ConnectionConfiguration { return readRpcTimeout; } + public int getMetaReadRpcTimeout() { + return metaReadRpcTimeout; + } + public int getWriteRpcTimeout() { return writeRpcTimeout; } @@ -248,6 +271,14 @@ public class ConnectionConfiguration { return rpcTimeout; } + public int getScanTimeout() { + return scanTimeout; + } + + public int getMetaScanTimeout() { + return metaScanTimeout; + } + public long getPauseMillis() { return pauseMs; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 62317224367..bf97b71b137 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -968,7 +968,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { try (Scope ignored = span.makeCurrent(); ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, - rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) { + rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(), + connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond)) { boolean tableNotFound = true; for (;;) { Result regionInfoRow = rcs.next(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index f8fa218fe59..fcc928b5c63 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -120,6 +120,9 @@ public class HTable implements Table { private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX private int readRpcTimeoutMs; // timeout for each read rpc request private int writeRpcTimeoutMs; // timeout for each write rpc request + + private final int scanReadRpcTimeout; + private final int scanTimeout; private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final HRegionLocator locator; @@ -191,6 +194,8 @@ public class HTable implements Table { this.rpcTimeoutMs = builder.rpcTimeout; this.readRpcTimeoutMs = builder.readRpcTimeout; this.writeRpcTimeoutMs = builder.writeRpcTimeout; + this.scanReadRpcTimeout = builder.scanReadRpcTimeout; + this.scanTimeout = builder.scanTimeout; this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); @@ -312,18 +317,21 @@ public class HTable implements Table { final boolean async = scan.isAsyncPrefetch() != null ? scan.isAsyncPrefetch() : connConfiguration.isClientScannerAsyncPrefetch(); - final int timeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan(); + final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan(); if (scan.isReversed()) { return new ReversedClientScanner(getConfiguration(), scan, getName(), connection, - rpcCallerFactory, rpcControllerFactory, pool, timeout); + rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, + replicaTimeout); } else { if (async) { return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection, - rpcCallerFactory, rpcControllerFactory, pool, timeout); + rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, + replicaTimeout); } else { return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection, - rpcCallerFactory, rpcControllerFactory, pool, timeout); + rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, + replicaTimeout); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java index 7a2c4b160c1..9746548222c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -53,15 +53,16 @@ public class ResultBoundedCompletionService { private T result = null; private ExecutionException exeEx = null; private volatile boolean cancelled = false; - private final int callTimeout; + private final int operationTimeout; private final RpcRetryingCaller retryingCaller; private boolean resultObtained = false; private final int replicaId; // replica id - public QueueingFuture(RetryingCallable future, int callTimeout, int id) { + public QueueingFuture(RetryingCallable future, int rpcTimeout, int operationTimeout, + int id) { this.future = future; - this.callTimeout = callTimeout; - this.retryingCaller = retryingCallerFactory. newCaller(); + this.operationTimeout = operationTimeout; + this.retryingCaller = retryingCallerFactory. newCaller(rpcTimeout); this.replicaId = id; } @@ -70,7 +71,7 @@ public class ResultBoundedCompletionService { public void run() { try { if (!cancelled) { - result = this.retryingCaller.callWithRetries(future, callTimeout); + result = this.retryingCaller.callWithRetries(future, operationTimeout); resultObtained = true; } } catch (Throwable t) { @@ -157,8 +158,8 @@ public class ResultBoundedCompletionService { this.completedTasks = new ArrayList<>(maxTasks); } - public void submit(RetryingCallable task, int callTimeout, int id) { - QueueingFuture newFuture = new QueueingFuture<>(task, callTimeout, id); + public void submit(RetryingCallable task, int rpcTimeout, int operationTimeout, int id) { + QueueingFuture newFuture = new QueueingFuture<>(task, rpcTimeout, operationTimeout, id); // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable executor.execute(newFuture); tasks[id] = newFuture; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 44ad82a38cd..8b6e6e46d17 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -38,10 +38,10 @@ public class ReversedClientScanner extends ClientScanner { */ public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { + RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, + int scannerTimeout, int primaryOperationTimeout) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); + scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 902993c2bd8..5b2208c1cc7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -305,7 +305,7 @@ public class RpcRetryingCallerWithReadReplicas { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); - cs.submit(callOnReplica, operationTimeout, id); + cs.submit(callOnReplica, rpcTimeout, operationTimeout, id); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 05cb850e337..21653214ce6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -64,14 +64,15 @@ class ScannerCallableWithReplicas implements RetryingCallable { private final RpcRetryingCaller caller; private final TableName tableName; private Configuration conf; - private int scannerTimeout; + private final int scannerTimeout; + private final int readRpcTimeout; private Set outstandingCallables = new HashSet<>(); private boolean someRPCcancelled = false; // required for testing purposes only private int regionReplication = 0; public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, - int retries, int scannerTimeout, int caching, Configuration conf, + int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf, RpcRetryingCaller caller) { this.currentScannerCallable = baseCallable; this.cConnection = cConnection; @@ -84,6 +85,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { this.retries = retries; this.tableName = tableName; this.conf = conf; + this.readRpcTimeout = readRpcTimeout; this.scannerTimeout = scannerTimeout; this.caller = caller; } @@ -323,7 +325,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { addCallsForCurrentReplica(ResultBoundedCompletionService> cs) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); - cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); + cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id); } private void addCallsForOtherReplicas( @@ -337,7 +339,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { setStartRowForReplicaCallable(s); outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); - cs.submit(retryingOnReplica, scannerTimeout, id); + cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java index 796fd6496f7..43d7a1894ea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java @@ -35,6 +35,8 @@ abstract class TableBuilderBase implements TableBuilder { protected int readRpcTimeout; protected int writeRpcTimeout; + protected final int scanReadRpcTimeout; + protected int scanTimeout; TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) { if (tableName == null) { @@ -46,6 +48,10 @@ abstract class TableBuilderBase implements TableBuilder { : connConf.getOperationTimeout(); this.rpcTimeout = connConf.getRpcTimeout(); this.readRpcTimeout = connConf.getReadRpcTimeout(); + this.scanReadRpcTimeout = + tableName.isSystemTable() ? connConf.getMetaReadRpcTimeout() : readRpcTimeout; + this.scanTimeout = + tableName.isSystemTable() ? connConf.getMetaScanTimeout() : connConf.getScanTimeout(); this.writeRpcTimeout = connConf.getWriteRpcTimeout(); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index c2c2bb3f98d..38776e337fe 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.RegionLocations; @@ -107,7 +108,8 @@ public class TestClientScanner { RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); + HConstants.DEFAULT_HBASE_RPC_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout); } @Override @@ -500,7 +502,7 @@ public class TestClientScanner { } @Override - public RpcRetryingCaller newCaller() { + public RpcRetryingCaller newCaller(int rpcTimeout) { return new RpcRetryingCaller() { @Override public void cancel() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java deleted file mode 100644 index 649fce1c750..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RSRpcServices; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; - -/** - * Test the scenario where a HRegionServer#scan() call, while scanning, timeout at client side and - * getting retried. This scenario should not result in some data being skipped at RS side. - */ -@Category({ MediumTests.class, ClientTests.class }) -public class TestClientScannerRPCTimeout { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestClientScannerRPCTimeout.class); - - private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerRPCTimeout.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final byte[] FAMILY = Bytes.toBytes("testFamily"); - private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); - private static final byte[] VALUE = Bytes.toBytes("testValue"); - private static final int rpcTimeout = 2 * 1000; - private static final int CLIENT_RETRIES_NUMBER = 3; - - @Rule - public TestName name = new TestName(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - // Don't report so often so easier to see other rpcs - conf.setInt("hbase.regionserver.msginterval", 3 * 10000); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); - conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); - conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); - TEST_UTIL.startMiniCluster(1); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testScannerNextRPCTimesout() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); - Table ht = TEST_UTIL.createTable(tableName, FAMILY); - byte[] r0 = Bytes.toBytes("row-0"); - byte[] r1 = Bytes.toBytes("row-1"); - byte[] r2 = Bytes.toBytes("row-2"); - byte[] r3 = Bytes.toBytes("row-3"); - putToTable(ht, r0); - putToTable(ht, r1); - putToTable(ht, r2); - putToTable(ht, r3); - LOG.info("Wrote our three values"); - RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1; - Scan scan = new Scan(); - scan.setCaching(1); - ResultScanner scanner = ht.getScanner(scan); - Result result = scanner.next(); - // fetched when openScanner - assertTrue("Expected row: row-0", Bytes.equals(r0, result.getRow())); - result = scanner.next(); - assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow())); - LOG.info("Got expected first row"); - long t1 = EnvironmentEdgeManager.currentTime(); - result = scanner.next(); - assertTrue((EnvironmentEdgeManager.currentTime() - t1) > rpcTimeout); - assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow())); - RSRpcServicesWithScanTimeout.seqNoToSleepOn = -1;// No need of sleep - result = scanner.next(); - assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow())); - scanner.close(); - - // test the case that RPC is always timesout - scanner = ht.getScanner(scan); - RSRpcServicesWithScanTimeout.sleepAlways = true; - RSRpcServicesWithScanTimeout.tryNumber = 0; - try { - result = scanner.next(); - } catch (IOException ioe) { - // catch the exception after max retry number - LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe); - } - assertTrue( - "Expected maximal try number=" + CLIENT_RETRIES_NUMBER + ", actual =" - + RSRpcServicesWithScanTimeout.tryNumber, - RSRpcServicesWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER); - } - - private void putToTable(Table ht, byte[] rowkey) throws IOException { - Put put = new Put(rowkey); - put.addColumn(FAMILY, QUALIFIER, VALUE); - ht.put(put); - } - - private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer { - public RegionServerWithScanTimeout(Configuration conf) - throws IOException, InterruptedException { - super(conf); - } - - @Override - protected RSRpcServices createRpcServices() throws IOException { - return new RSRpcServicesWithScanTimeout(this); - } - } - - private static class RSRpcServicesWithScanTimeout extends RSRpcServices { - private long tableScannerId; - private boolean slept; - private static long seqNoToSleepOn = -1; - private static boolean sleepAlways = false; - private static int tryNumber = 0; - - public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException { - super(rs); - } - - @Override - public ScanResponse scan(final RpcController controller, final ScanRequest request) - throws ServiceException { - if (request.hasScannerId()) { - ScanResponse scanResponse = super.scan(controller, request); - if ( - this.tableScannerId == request.getScannerId() - && (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq())) - ) { - try { - LOG.info("SLEEPING " + (rpcTimeout + 500)); - Thread.sleep(rpcTimeout + 500); - } catch (InterruptedException e) { - } - slept = true; - tryNumber++; - if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) { - sleepAlways = false; - } - } - return scanResponse; - } else { - ScanResponse scanRes = super.scan(controller, request); - String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); - if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) { - tableScannerId = scanRes.getScannerId(); - } - return scanRes; - } - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java new file mode 100644 index 00000000000..2bff2297ff5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestClientScannerTimeouts { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestClientScannerTimeouts.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static AsyncConnection ASYNC_CONN; + private static Connection CONN; + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + + private static final byte[] ROW0 = Bytes.toBytes("row-0"); + private static final byte[] ROW1 = Bytes.toBytes("row-1"); + private static final byte[] ROW2 = Bytes.toBytes("row-2"); + private static final byte[] ROW3 = Bytes.toBytes("row-3"); + private static final int rpcTimeout = 1000; + private static final int scanTimeout = 3 * rpcTimeout; + private static final int metaScanTimeout = 6 * rpcTimeout; + private static final int CLIENT_RETRIES_NUMBER = 3; + + private static TableName tableName; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + // Don't report so often so easier to see other rpcs + conf.setInt("hbase.regionserver.msginterval", 3 * 10000); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); + TEST_UTIL.startMiniCluster(1); + + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout); + conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout); + conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get(); + CONN = ConnectionFactory.createConnection(conf); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + CONN.close(); + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + public void setup(boolean isSystemTable) throws IOException { + RSRpcServicesWithScanTimeout.reset(); + + String nameAsString = name.getMethodName(); + if (isSystemTable) { + nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString; + } + tableName = TableName.valueOf(nameAsString); + TEST_UTIL.createTable(tableName, FAMILY); + + Table table = CONN.getTable(tableName); + putToTable(table, ROW0); + putToTable(table, ROW1); + putToTable(table, ROW2); + putToTable(table, ROW3); + LOG.info("Wrote our four values"); + + table.getRegionLocator().getAllRegionLocations(); + + // reset again incase the creation/population caused anything to trigger + RSRpcServicesWithScanTimeout.reset(); + } + + private void expectRow(byte[] expected, Result result) { + assertTrue("Expected row: " + Bytes.toString(expected), + Bytes.equals(expected, result.getRow())); + } + + private void expectNumTries(int expected) { + assertEquals( + "Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber, + expected, RSRpcServicesWithScanTimeout.tryNumber); + // reset for next + RSRpcServicesWithScanTimeout.tryNumber = 0; + } + + /** + * verify that we don't miss any data when encountering an OutOfOrderScannerNextException. + * Typically, the only way to naturally trigger this is if a client-side timeout causes an + * erroneous next() call. This is relatively hard to do these days because the server attempts to + * always return before the timeout. In this test we force the server to throw this exception, so + * that we can test the retry logic appropriately. + */ + @Test + public void testRetryOutOfOrderScannerNextException() throws IOException { + expectRetryOutOfOrderScannerNext(() -> getScanner(CONN)); + } + + /** + * AsyncTable version of above + */ + @Test + public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException { + expectRetryOutOfOrderScannerNext(this::getAsyncScanner); + } + + /** + * verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for normal scans. Use a + * special connection which has retries disabled, because otherwise the scanner will retry the + * timed out next() call and mess up the test. + */ + @Test + public void testNormalScanTimeoutOnNext() throws IOException { + setup(false); + // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and + // will retry until scannerTimeout. This makes it more difficult to test the timeouts + // of normal next() calls. So we use a separate connection here which has retries disabled. + Configuration confNoRetries = new Configuration(CONN.getConfiguration()); + confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) { + expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn)); + } + } + + /** + * AsyncTable version of above + */ + @Test + public void testNormalScanTimeoutOnNextAsync() throws IOException { + setup(false); + expectTimeoutOnNext(scanTimeout, this::getAsyncScanner); + } + + /** + * verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for + * meta scans + */ + @Test + public void testNormalScanTimeoutOnOpenScanner() throws IOException { + setup(false); + expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner); + } + + /** + * AsyncTable version of above + */ + @Test + public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException { + setup(false); + expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner); + } + + /** + * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for + * next() calls in meta scans + */ + @Test + public void testMetaScanTimeoutOnNext() throws IOException { + setup(true); + expectTimeoutOnNext(metaScanTimeout, this::getScanner); + } + + /** + * AsyncTable version of above + */ + @Test + public void testMetaScanTimeoutOnNextAsync() throws IOException { + setup(true); + expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner); + } + + /** + * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for + * openScanner() calls for meta scans + */ + @Test + public void testMetaScanTimeoutOnOpenScanner() throws IOException { + setup(true); + expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner); + } + + /** + * AsyncTable version of above + */ + @Test + public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException { + setup(true); + expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner); + } + + private void expectRetryOutOfOrderScannerNext(Supplier scannerSupplier) + throws IOException { + setup(false); + RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1; + + LOG.info( + "Opening scanner, expecting no errors from first next() call from openScanner response"); + ResultScanner scanner = scannerSupplier.get(); + Result result = scanner.next(); + expectRow(ROW0, result); + expectNumTries(0); + + LOG.info("Making first next() RPC, expecting no errors for seqNo 0"); + result = scanner.next(); + expectRow(ROW1, result); + expectNumTries(0); + + LOG.info( + "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry"); + result = scanner.next(); + expectRow(ROW2, result); + expectNumTries(1); + + // reset so no errors. since last call restarted the scan and following + // call would otherwise fail + RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1; + + LOG.info("Finishing scan, expecting no errors"); + result = scanner.next(); + expectRow(ROW3, result); + scanner.close(); + + LOG.info("Testing always throw exception"); + byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 }; + int i = 0; + + // test the case that RPC always throws + scanner = scannerSupplier.get(); + RSRpcServicesWithScanTimeout.throwAlways = true; + + while (true) { + LOG.info("Calling scanner.next()"); + result = scanner.next(); + if (result == null) { + break; + } else { + byte[] expectedResult = expectedResults[i++]; + expectRow(expectedResult, result); + } + } + + // ensure we verified all rows. this along with the expectRow check above + // proves that we didn't miss any rows. + assertEquals("Expected to exhaust expectedResults array length=" + expectedResults.length + + ", actual index=" + i, expectedResults.length, i); + + // expect all but the first row (which came from initial openScanner) to have thrown an error + expectNumTries(expectedResults.length - 1); + + } + + private void expectTimeoutOnNext(int timeout, Supplier scannerSupplier) + throws IOException { + RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1; + RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); + + LOG.info( + "Opening scanner, expecting no timeouts from first next() call from openScanner response"); + ResultScanner scanner = scannerSupplier.get(); + Result result = scanner.next(); + expectRow(ROW0, result); + + LOG.info("Making first next() RPC, expecting no timeout for seqNo 0"); + result = scanner.next(); + expectRow(ROW1, result); + + LOG.info("Making second next() RPC, expecting timeout"); + long start = System.nanoTime(); + try { + scanner.next(); + fail("Expected CallTimeoutException"); + } catch (RetriesExhaustedException e) { + assertTrue("Expected CallTimeoutException", e.getCause() instanceof CallTimeoutException + || e.getCause() instanceof SocketTimeoutException); + } + expectTimeout(start, timeout); + } + + private void expectTimeoutOnOpenScanner(int timeout, Supplier scannerSupplier) + throws IOException { + RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); + RSRpcServicesWithScanTimeout.sleepOnOpen = true; + LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response"); + long start = System.nanoTime(); + try { + scannerSupplier.get().next(); + fail("Expected SocketTimeoutException or CallTimeoutException"); + } catch (RetriesExhaustedException e) { + LOG.info("Got error", e); + assertTrue("Expected SocketTimeoutException or CallTimeoutException, but was " + e.getCause(), + e.getCause() instanceof CallTimeoutException + || e.getCause() instanceof SocketTimeoutException); + } + expectTimeout(start, timeout); + } + + private void expectTimeout(long start, int timeout) { + long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + LOG.info("Expected duration >= {}, and got {}", timeout, duration); + assertTrue("Expected duration >= " + timeout + ", but was " + duration, duration >= timeout); + } + + private ResultScanner getScanner() { + return getScanner(CONN); + } + + private ResultScanner getScanner(Connection conn) { + Scan scan = new Scan(); + scan.setCaching(1); + try { + return conn.getTable(tableName).getScanner(scan); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ResultScanner getAsyncScanner() { + Scan scan = new Scan(); + scan.setCaching(1); + return ASYNC_CONN.getTable(tableName).getScanner(scan); + } + + private void putToTable(Table ht, byte[] rowkey) throws IOException { + Put put = new Put(rowkey); + put.addColumn(FAMILY, QUALIFIER, VALUE); + ht.put(put); + } + + private static class RegionServerWithScanTimeout + extends MiniHBaseCluster.MiniHBaseClusterRegionServer { + public RegionServerWithScanTimeout(Configuration conf) + throws IOException, InterruptedException { + super(conf); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new RSRpcServicesWithScanTimeout(this); + } + } + + private static class RSRpcServicesWithScanTimeout extends RSRpcServices { + private long tableScannerId; + + private static long seqNoToThrowOn = -1; + private static boolean throwAlways = false; + private static boolean threw; + + private static long seqNoToSleepOn = -1; + private static boolean sleepOnOpen = false; + private static volatile boolean slept; + private static int tryNumber = 0; + + private static int sleepTime = rpcTimeout + 500; + + public static void setSleepForTimeout(int timeout) { + sleepTime = timeout + 500; + } + + public static void reset() { + setSleepForTimeout(scanTimeout); + + seqNoToSleepOn = -1; + seqNoToThrowOn = -1; + throwAlways = false; + threw = false; + sleepOnOpen = false; + slept = false; + tryNumber = 0; + } + + public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public ScanResponse scan(final RpcController controller, final ScanRequest request) + throws ServiceException { + if (request.hasScannerId()) { + LOG.info("Got request {}", request); + ScanResponse scanResponse = super.scan(controller, request); + if (tableScannerId != request.getScannerId() || request.getCloseScanner()) { + return scanResponse; + } + + if ( + throwAlways + || (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq()) + ) { + threw = true; + tryNumber++; + LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber, + tableScannerId); + throw new ServiceException(new OutOfOrderScannerNextException()); + } + + if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) { + try { + LOG.info("SLEEPING " + sleepTime); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + slept = true; + tryNumber++; + } + return scanResponse; + } else { + ScanResponse scanRes = super.scan(controller, request); + String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); + if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) { + tableScannerId = scanRes.getScannerId(); + if (sleepOnOpen) { + try { + LOG.info("openScanner SLEEPING " + sleepTime); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + } + return scanRes; + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index d9b1314b09c..f5debcb627a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; @@ -90,6 +92,7 @@ public class TestScannerHeartbeatMessages { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Table TABLE = null; + private static Connection CONN = null; /** * Table configuration @@ -133,6 +136,7 @@ public class TestScannerHeartbeatMessages { conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); + // setting these here for usage on the server side. will override for client side below conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); @@ -141,6 +145,10 @@ public class TestScannerHeartbeatMessages { conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); TEST_UTIL.startMiniCluster(1); + // set client timeout for client side, we want it to be less than server side. + Configuration clientConf = new Configuration(conf); + clientConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); + CONN = ConnectionFactory.createConnection(clientConf); TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); } @@ -180,10 +188,10 @@ public class TestScannerHeartbeatMessages { static Table createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers, byte[] cellValue) throws IOException { - Table ht = TEST_UTIL.createTable(name, families); + TEST_UTIL.createTable(name, families); + Table ht = CONN.getTable(name); List puts = createPuts(rows, families, qualifiers, cellValue); ht.put(puts); - ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); return ht; } @@ -211,6 +219,7 @@ public class TestScannerHeartbeatMessages { @AfterClass public static void tearDownAfterClass() throws Exception { + CONN.close(); TEST_UTIL.shutdownMiniCluster(); }