setRetryPauseForCQTBE(long pause, TimeUnit unit);
+
/**
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
*
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
index ee571f10241..399d9ddfaff 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
@@ -45,6 +45,8 @@ abstract class AsyncTableBuilderBase
protected long pauseNs;
+ protected long pauseForCQTBENs;
+
protected int maxAttempts;
protected int startLogErrorsCnt;
@@ -58,6 +60,7 @@ abstract class AsyncTableBuilderBase
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
this.pauseNs = connConf.getPauseNs();
+ this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.maxAttempts = retries2Attempts(connConf.getMaxRetries());
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
@@ -98,6 +101,12 @@ abstract class AsyncTableBuilderBase
return this;
}
+ @Override
+ public AsyncTableBuilderBase setRetryPauseForCQTBE(long pause, TimeUnit unit) {
+ this.pauseForCQTBENs = unit.toNanos(pause);
+ return this;
+ }
+
@Override
public AsyncTableBuilderBase setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 35cb92276ab..0fd3cba797f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -327,6 +327,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private final long pauseNs;
+ private final long pauseForCQTBENs;
+
private final int maxAttempts;
private final int startLogErrorsCnt;
@@ -341,6 +343,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.operationTimeoutNs = builder.operationTimeoutNs;
this.pauseNs = builder.pauseNs;
+ if (builder.pauseForCQTBENs < builder.pauseNs) {
+ LOG.warn(
+ "Configured value of pauseForCQTBENs is {} ms, which is less than" +
+ " the normal pause value {} ms, use the greater one instead",
+ TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
+ TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
+ this.pauseForCQTBENs = builder.pauseNs;
+ } else {
+ this.pauseForCQTBENs = builder.pauseForCQTBENs;
+ }
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.ng = connection.getNonceGenerator();
@@ -348,18 +360,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private MasterRequestCallerBuilder newMasterCaller() {
return this.connection.callerFactory. masterRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
private AdminRequestCallerBuilder newAdminCaller() {
return this.connection.callerFactory. adminRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
@FunctionalInterface
@@ -3357,10 +3369,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private ServerRequestCallerBuilder newServerCaller() {
return this.connection.callerFactory. serverRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index b2ca3a90b8f..8050137ba7a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
@@ -77,6 +79,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
@InterfaceAudience.Private
class RawAsyncTableImpl implements AsyncTable {
+ private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
+
private final AsyncConnectionImpl conn;
private final Timer retryTimer;
@@ -99,6 +103,8 @@ class RawAsyncTableImpl implements AsyncTable {
private final long pauseNs;
+ private final long pauseForCQTBENs;
+
private final int maxAttempts;
private final int startLogErrorsCnt;
@@ -113,6 +119,16 @@ class RawAsyncTableImpl implements AsyncTable {
this.operationTimeoutNs = builder.operationTimeoutNs;
this.scanTimeoutNs = builder.scanTimeoutNs;
this.pauseNs = builder.pauseNs;
+ if (builder.pauseForCQTBENs < builder.pauseNs) {
+ LOG.warn(
+ "Configured value of pauseForCQTBENs is {} ms, which is less than" +
+ " the normal pause value {} ms, use the greater one instead",
+ TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
+ TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
+ this.pauseForCQTBENs = builder.pauseNs;
+ } else {
+ this.pauseForCQTBENs = builder.pauseForCQTBENs;
+ }
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
@@ -220,8 +236,8 @@ class RawAsyncTableImpl implements AsyncTable {
return conn.callerFactory. single().table(tableName).row(row).priority(priority)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
private SingleRequestCallerBuilder newCaller(
@@ -451,7 +467,8 @@ class RawAsyncTableImpl implements AsyncTable {
@Override
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
- pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
+ pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt)
+ .start();
}
private long resultSize2CacheSize(long maxResultSize) {
@@ -521,7 +538,8 @@ class RawAsyncTableImpl implements AsyncTable {
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+ .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt).call();
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java
new file mode 100644
index 00000000000..075e1bc2212
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.CallRunner;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncClientPauseForCallQueueTooBig {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("CQTBE");
+
+ private static byte[] FAMILY = Bytes.toBytes("Family");
+
+ private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
+
+ private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1);
+
+ private static AsyncConnection CONN;
+
+ private static boolean FAIL = false;
+
+ private static ConcurrentMap INVOKED = new ConcurrentHashMap<>();
+
+ public static final class CQTBERpcScheduler extends SimpleRpcScheduler {
+
+ public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
+ int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
+ Abortable server, int highPriorityLevel) {
+ super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
+ metaTransitionHandler, priority, server, highPriorityLevel);
+ }
+
+ @Override
+ public boolean dispatch(CallRunner callTask) throws InterruptedException {
+ if (FAIL) {
+ MethodDescriptor method = callTask.getRpcCall().getMethod();
+ // this is for test scan, where we will send a open scanner first and then a next, and we
+ // expect that we hit CQTBE two times.
+ if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) {
+ return false;
+ }
+ }
+ return super.dispatch(callTask);
+ }
+ }
+
+ public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory {
+
+ @Override
+ public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
+ int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ return new CQTBERpcScheduler(conf, handlerCount,
+ conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
+ conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
+ conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
+ HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
+ priority, server, HConstants.QOS_THRESHOLD);
+ }
+
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
+ UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
+ TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS));
+ UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+ CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class);
+ UTIL.startMiniCluster(1);
+ CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ Closeables.close(CONN, true);
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUpBeforeTest() throws IOException {
+ try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
+ }
+ }
+ FAIL = true;
+ }
+
+ @After
+ public void tearDownAfterTest() throws IOException {
+ FAIL = false;
+ INVOKED.clear();
+ UTIL.getAdmin().disableTable(TABLE_NAME);
+ UTIL.getAdmin().deleteTable(TABLE_NAME);
+ }
+
+ private void assertTime(Callable callable, long time) throws Exception {
+ long startNs = System.nanoTime();
+ callable.call();
+ long costNs = System.nanoTime() - startNs;
+ assertTrue(costNs > time);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ assertTime(() -> {
+ Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get();
+ assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER));
+ return null;
+ }, PAUSE_FOR_CQTBE_NS);
+ }
+
+ @Test
+ public void testBatch() throws Exception {
+ assertTime(() -> {
+ List> futures = new ArrayList<>();
+ try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
+ for (int i = 100; i < 110; i++) {
+ futures.add(mutator
+ .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
+ }
+ }
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+ }, PAUSE_FOR_CQTBE_NS);
+ }
+
+ @Test
+ public void testScan() throws Exception {
+ // we will hit CallQueueTooBigException two times so the sleep time should be twice
+ assertTime(() -> {
+ try (
+ ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
+ for (int i = 0; i < 100; i++) {
+ Result result = scanner.next();
+ assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
+ }
+ assertNull(scanner.next());
+ }
+ return null;
+ }, PAUSE_FOR_CQTBE_NS * 2);
+ }
+}