HBASE-27798: Client side should back off based on wait interval in RpcThrottlingException (#5226)

Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
Ray Mattingly 2023-06-05 17:54:55 -04:00 committed by Ray Mattingly
parent 13c6be0244
commit aa0c3f1e97
6 changed files with 476 additions and 55 deletions

View File

@ -35,6 +35,7 @@ import java.util.IdentityHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -56,9 +57,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -103,10 +104,6 @@ class AsyncBatchRpcRetryingCaller<T> {
private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors; private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
private final long pauseNs;
private final long pauseNsForServerOverloaded;
private final int maxAttempts; private final int maxAttempts;
private final long operationTimeoutNs; private final long operationTimeoutNs;
@ -117,6 +114,8 @@ class AsyncBatchRpcRetryingCaller<T> {
private final long startNs; private final long startNs;
private final HBaseServerExceptionPauseManager pauseManager;
// we can not use HRegionLocation as the map key because the hashCode and equals method of // we can not use HRegionLocation as the map key because the hashCode and equals method of
// HRegionLocation only consider serverName. // HRegionLocation only consider serverName.
private static final class RegionRequest { private static final class RegionRequest {
@ -156,8 +155,6 @@ class AsyncBatchRpcRetryingCaller<T> {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.conn = conn; this.conn = conn;
this.tableName = tableName; this.tableName = tableName;
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs; this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
@ -165,6 +162,7 @@ class AsyncBatchRpcRetryingCaller<T> {
this.actions = new ArrayList<>(actions.size()); this.actions = new ArrayList<>(actions.size());
this.futures = new ArrayList<>(actions.size()); this.futures = new ArrayList<>(actions.size());
this.action2Future = new IdentityHashMap<>(actions.size()); this.action2Future = new IdentityHashMap<>(actions.size());
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
for (int i = 0, n = actions.size(); i < n; i++) { for (int i = 0, n = actions.size(); i < n; i++) {
Row rawAction = actions.get(i); Row rawAction = actions.get(i);
Action action; Action action;
@ -476,19 +474,15 @@ class AsyncBatchRpcRetryingCaller<T> {
return; return;
} }
long delayNs; long delayNs;
long pauseNsToUse; boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error);
boolean isServerOverloaded = false; OptionalLong maybePauseNsToUse =
if (error instanceof RpcThrottlingException) { pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error; if (!maybePauseNsToUse.isPresent()) {
pauseNsToUse = rpcThrottlingException.getWaitInterval() * 1000; // wait interval is in millis failAll(actions, tries);
if (LOG.isDebugEnabled()) { return;
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException",
rpcThrottlingException.getWaitInterval(), rpcThrottlingException);
}
} else {
isServerOverloaded = HBaseServerException.isServerOverloaded(error);
pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
} }
long pauseNsToUse = maybePauseNsToUse.getAsLong();
if (operationTimeoutNs > 0) { if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) { if (maxDelayNs <= 0) {

View File

@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -35,9 +36,9 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -57,10 +58,6 @@ public abstract class AsyncRpcRetryingCaller<T> {
private final long startNs; private final long startNs;
private final long pauseNs;
private final long pauseNsForServerOverloaded;
private int tries = 1; private int tries = 1;
private final int maxAttempts; private final int maxAttempts;
@ -79,14 +76,14 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller; protected final HBaseRpcController controller;
private final HBaseServerExceptionPauseManager pauseManager;
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) { long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.conn = conn; this.conn = conn;
this.priority = priority; this.priority = priority;
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs; this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
@ -96,6 +93,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
this.controller.setPriority(priority); this.controller.setPriority(priority);
this.exceptions = new ArrayList<>(); this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime(); this.startNs = System.nanoTime();
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
} }
private long elapsedMs() { private long elapsedMs() {
@ -126,18 +124,14 @@ public abstract class AsyncRpcRetryingCaller<T> {
} }
private void tryScheduleRetry(Throwable error) { private void tryScheduleRetry(Throwable error) {
long pauseNsToUse; OptionalLong maybePauseNsToUse =
if (error instanceof RpcThrottlingException) { pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error; if (!maybePauseNsToUse.isPresent()) {
pauseNsToUse = rpcThrottlingException.getWaitInterval() * 1000; // wait interval is in millis completeExceptionally();
if (LOG.isDebugEnabled()) { return;
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException",
rpcThrottlingException.getWaitInterval(), rpcThrottlingException);
}
} else {
pauseNsToUse =
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
} }
long pauseNsToUse = maybePauseNsToUse.getAsLong();
long delayNs; long delayNs;
if (operationTimeoutNs > 0) { if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;

View File

@ -34,6 +34,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -43,11 +44,11 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -100,10 +101,6 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final long scannerLeaseTimeoutPeriodNs; private final long scannerLeaseTimeoutPeriodNs;
private final long pauseNs;
private final long pauseNsForServerOverloaded;
private final int maxAttempts; private final int maxAttempts;
private final long scanTimeoutNs; private final long scanTimeoutNs;
@ -132,6 +129,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private long nextCallSeq = -1L; private long nextCallSeq = -1L;
private final HBaseServerExceptionPauseManager pauseManager;
private enum ScanControllerState { private enum ScanControllerState {
INITIALIZED, INITIALIZED,
SUSPENDED, SUSPENDED,
@ -331,8 +330,6 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.loc = loc; this.loc = loc;
this.regionServerRemote = isRegionServerRemote; this.regionServerRemote = isRegionServerRemote;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs; this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
@ -347,6 +344,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.controller = conn.rpcControllerFactory.newController(); this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority); this.controller.setPriority(priority);
this.exceptions = new ArrayList<>(); this.exceptions = new ArrayList<>();
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
} }
private long elapsedMs() { private long elapsedMs() {
@ -420,18 +418,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
return; return;
} }
long delayNs; long delayNs;
long pauseNsToUse; OptionalLong maybePauseNsToUse =
if (error instanceof RpcThrottlingException) { pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error; if (!maybePauseNsToUse.isPresent()) {
pauseNsToUse = rpcThrottlingException.getWaitInterval() * 1000; // wait interval is in millis completeExceptionally(!scannerClosed);
if (LOG.isDebugEnabled()) { return;
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException",
rpcThrottlingException.getWaitInterval(), rpcThrottlingException);
}
} else {
pauseNsToUse =
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
} }
long pauseNsToUse = maybePauseNsToUse.getAsLong();
if (scanTimeoutNs > 0) { if (scanTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) { if (maxDelayNs <= 0) {

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.backoff;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class HBaseServerExceptionPauseManager {
private static final Logger LOG = LoggerFactory.getLogger(HBaseServerExceptionPauseManager.class);
private final long pauseNs;
private final long pauseNsForServerOverloaded;
public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded) {
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
}
public OptionalLong getPauseNsFromException(Throwable error, long remainingTimeNs) {
long expectedSleepNs;
if (error instanceof RpcThrottlingException) {
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
if (expectedSleepNs > remainingTimeNs) {
return OptionalLong.empty();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleepNs,
rpcThrottlingException);
}
} else {
expectedSleepNs =
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
}
return OptionalLong.of(expectedSleepNs);
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.backoff;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ClientTests.class, SmallTests.class })
public class TestHBaseServerExceptionPauseManager {
private static final long WAIT_INTERVAL_MILLIS = 1L;
private static final long WAIT_INTERVAL_NANOS =
TimeUnit.MILLISECONDS.toNanos(WAIT_INTERVAL_MILLIS);
private static final long PAUSE_NANOS_FOR_SERVER_OVERLOADED = WAIT_INTERVAL_NANOS * 3;
private static final long PAUSE_NANOS = WAIT_INTERVAL_NANOS * 2;
private final RpcThrottlingException RPC_THROTTLING_EXCEPTION = new RpcThrottlingException(
RpcThrottlingException.Type.NumRequestsExceeded, WAIT_INTERVAL_MILLIS, "doot");
private final Throwable OTHER_EXCEPTION = new RuntimeException("");
private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true);
private final HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class);
@Test
public void itSupportsRpcThrottlingNanos() {
OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, Long.MAX_VALUE);
assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
}
@Test
public void itSupportsServerOverloadedExceptionNanos() {
OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, Long.MAX_VALUE);
assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS_FOR_SERVER_OVERLOADED);
}
@Test
public void itSupportsOtherExceptionNanos() {
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, Long.MAX_VALUE);
assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS);
}
@Test
public void itThrottledTimeoutFastFail() {
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 0L);
assertFalse(pauseNanos.isPresent());
}
}

View File

@ -0,0 +1,294 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncClientPauseForRpcThrottling {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncClientPauseForRpcThrottling.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static TableName TABLE_NAME = TableName.valueOf("RpcThrottling");
private static byte[] FAMILY = Bytes.toBytes("Family");
private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
private static AsyncConnection CONN;
private static final AtomicBoolean THROTTLE = new AtomicBoolean(false);
private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices {
public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
throws ServiceException {
maybeThrottle();
return super.get(controller, request);
}
@Override
public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
throws ServiceException {
maybeThrottle();
return super.multi(rpcc, request);
}
@Override
public ClientProtos.ScanResponse scan(RpcController controller,
ClientProtos.ScanRequest request) throws ServiceException {
maybeThrottle();
return super.scan(controller, request);
}
private void maybeThrottle() throws ServiceException {
if (THROTTLE.get()) {
THROTTLE.set(false);
throw new ServiceException(new RpcThrottlingException("number of requests exceeded - wait "
+ TimeUnit.NANOSECONDS.toMillis(WAIT_INTERVAL_NANOS) + "ms"));
}
}
}
public static final class ThrottlingRegionServerForTest extends HRegionServer {
public ThrottlingRegionServerForTest(Configuration conf) throws IOException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new ThrottlingRSRpcServicesForTest(this);
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
UTIL.startMiniCluster(1);
UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
ThrottlingRegionServerForTest.class, HRegionServer.class);
HRegionServer regionServer = UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer();
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
UTIL.waitTableAvailable(TABLE_NAME);
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
}
}
UTIL.getAdmin().move(UTIL.getAdmin().getRegions(TABLE_NAME).get(0).getEncodedNameAsBytes(),
regionServer.getServerName());
Configuration conf = new Configuration(UTIL.getConfiguration());
CONN = ConnectionFactory.createAsyncConnection(conf).get();
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.getAdmin().disableTable(TABLE_NAME);
UTIL.getAdmin().deleteTable(TABLE_NAME);
Closeables.close(CONN, true);
UTIL.shutdownMiniCluster();
}
private void assertTime(Callable<Void> callable, long time, boolean isGreater) throws Exception {
long startNs = System.nanoTime();
callable.call();
long costNs = System.nanoTime() - startNs;
if (isGreater) {
assertTrue(costNs > time);
} else {
assertTrue(costNs <= time);
}
}
@Test
public void itWaitsForThrottledGet() throws Exception {
boolean isThrottled = true;
THROTTLE.set(isThrottled);
AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
assertTime(() -> {
table.get(new Get(Bytes.toBytes(0))).get();
return null;
}, WAIT_INTERVAL_NANOS, isThrottled);
}
@Test
public void itDoesNotWaitForUnthrottledGet() throws Exception {
boolean isThrottled = false;
THROTTLE.set(isThrottled);
AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
assertTime(() -> {
table.get(new Get(Bytes.toBytes(0))).get();
return null;
}, WAIT_INTERVAL_NANOS, isThrottled);
}
@Test
public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception {
AsyncTable<AdvancedScanResultConsumer> table =
CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MILLISECONDS).build();
boolean isThrottled = true;
THROTTLE.set(isThrottled);
assertTime(() -> {
assertThrows(ExecutionException.class, () -> table.get(new Get(Bytes.toBytes(0))).get());
return null;
}, WAIT_INTERVAL_NANOS, false);
}
@Test
public void itWaitsForThrottledBatch() throws Exception {
boolean isThrottled = true;
THROTTLE.set(isThrottled);
assertTime(() -> {
List<CompletableFuture<?>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
for (int i = 100; i < 110; i++) {
futures.add(mutator
.mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
}
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
}, WAIT_INTERVAL_NANOS, isThrottled);
}
@Test
public void itDoesNotWaitForUnthrottledBatch() throws Exception {
boolean isThrottled = false;
THROTTLE.set(isThrottled);
assertTime(() -> {
List<CompletableFuture<?>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
for (int i = 100; i < 110; i++) {
futures.add(mutator
.mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
}
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
}, WAIT_INTERVAL_NANOS, isThrottled);
}
@Test
public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception {
boolean isThrottled = true;
THROTTLE.set(isThrottled);
assertTime(() -> {
List<CompletableFuture<?>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
.setOperationTimeout(1, TimeUnit.MILLISECONDS).build()) {
for (int i = 100; i < 110; i++) {
futures.add(mutator
.mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
}
}
assertThrows(ExecutionException.class,
() -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get());
return null;
}, WAIT_INTERVAL_NANOS, false);
}
@Test
public void itWaitsForThrottledScan() throws Exception {
boolean isThrottled = true;
THROTTLE.set(isThrottled);
assertTime(() -> {
try (
ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
for (int i = 0; i < 100; i++) {
Result result = scanner.next();
assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
}
}
return null;
}, WAIT_INTERVAL_NANOS, isThrottled);
}
@Test
public void itDoesNotWaitForUnthrottledScan() throws Exception {
boolean isThrottled = false;
THROTTLE.set(isThrottled);
assertTime(() -> {
try (
ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
for (int i = 0; i < 100; i++) {
Result result = scanner.next();
assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
}
}
return null;
}, WAIT_INTERVAL_NANOS, isThrottled);
}
@Test
public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception {
AsyncTable<AdvancedScanResultConsumer> table =
CONN.getTableBuilder(TABLE_NAME).setScanTimeout(1, TimeUnit.MILLISECONDS).build();
boolean isThrottled = true;
THROTTLE.set(isThrottled);
assertTime(() -> {
try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
for (int i = 0; i < 100; i++) {
assertThrows(RetriesExhaustedException.class, scanner::next);
}
}
return null;
}, WAIT_INTERVAL_NANOS, false);
}
}