HBASE-26807 Unify CallQueueTooBigException special pause with CallDroppedException (#4180)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
Bryan Beaudreault 2022-04-07 11:01:44 -04:00 committed by GitHub
parent c791c8181a
commit eb4c2ae4b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 499 additions and 190 deletions

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -28,14 +26,16 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
public class CallDroppedException extends IOException {
public class CallDroppedException extends HBaseServerException {
public CallDroppedException() {
super();
// For now all call drops are due to server being overloaded.
// We could decouple this if desired.
super(true);
}
// Absence of this constructor prevents proper unwrapping of
// remote exception on the client side
public CallDroppedException(String message) {
super(message);
super(true, message);
}
}

View File

@ -18,13 +18,15 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Returned to clients when their request was dropped because the call queue was too big to
* accept a new call. Clients should retry upon receiving it.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
public class CallQueueTooBigException extends IOException {
public class CallQueueTooBigException extends CallDroppedException {
public CallQueueTooBigException() {
super();
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Base class for exceptions thrown by an HBase server. May contain extra info about
* the state of the server when the exception was thrown.
*/
@InterfaceAudience.Public
public class HBaseServerException extends HBaseIOException {
private boolean serverOverloaded;
public HBaseServerException() {
this(false);
}
public HBaseServerException(String message) {
this(false, message);
}
public HBaseServerException(boolean serverOverloaded) {
this.serverOverloaded = serverOverloaded;
}
public HBaseServerException(boolean serverOverloaded, String message) {
super(message);
this.serverOverloaded = serverOverloaded;
}
/**
* @param t throwable to check for server overloaded state
* @return True if the server was considered overloaded when the exception was thrown
*/
public static boolean isServerOverloaded(Throwable t) {
if (t instanceof HBaseServerException) {
return ((HBaseServerException) t).isServerOverloaded();
}
return false;
}
/**
* Necessary for parsing RemoteException on client side
* @param serverOverloaded True if server was overloaded when exception was thrown
*/
public void setServerOverloaded(boolean serverOverloaded) {
this.serverOverloaded = serverOverloaded;
}
/**
* @return True if server was considered overloaded when exception was thrown
*/
public boolean isServerOverloaded() {
return serverOverloaded;
}
}

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -50,21 +51,42 @@ public interface AsyncAdminBuilder {
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
* @return this for invocation chaining
* @see #setRetryPauseForCQTBE(long, TimeUnit)
* @see #setRetryPauseForServerOverloaded(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);
/**
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
* exponential policy to generate sleep time when retrying.
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
* We use an exponential policy to generate sleep time from this base when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
* means the server is overloaded. We just use the normal pause value for
* {@code CallQueueTooBigException} if here you specify a smaller value.
* {@link #setRetryPause(long, TimeUnit)} method, as usually
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
* specify a smaller value.
*
* @see #setRetryPause(long, TimeUnit)
* @deprecated Since 2.5.0, will be removed in 4.0.0. Please use
* {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead.
*/
@Deprecated
default AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) {
return setRetryPauseForServerOverloaded(pause, unit);
}
/**
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
* We use an exponential policy to generate sleep time when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
* specify a smaller value.
*
* @see #setRetryPause(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
AsyncAdminBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit);
/**
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.

View File

@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
protected long pauseNs;
protected long pauseForCQTBENs;
protected long pauseNsForServerOverloaded;
protected int maxAttempts;
@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();
this.maxAttempts = connConf.getMaxRetries();
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
@ -67,8 +67,8 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
}
@Override
public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(timeout);
public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(timeout);
return this;
}

View File

@ -44,10 +44,10 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
private ServerName serverName;
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}

View File

@ -45,9 +45,9 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RetryImmediatelyException;
@ -63,9 +63,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@ -104,7 +102,7 @@ class AsyncBatchRpcRetryingCaller<T> {
private final long pauseNs;
private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;
private final int maxAttempts;
@ -150,13 +148,14 @@ class AsyncBatchRpcRetryingCaller<T> {
}
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
TableName tableName, List<? extends Row> actions, long pauseNs,
long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@ -466,17 +465,17 @@ class AsyncBatchRpcRetryingCaller<T> {
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
error instanceof CallQueueTooBigException);
HBaseServerException.isServerOverloaded(error));
}
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
boolean isCallQueueTooBig) {
boolean isServerOverloaded) {
if (immediately) {
groupAndSend(actions, tries);
return;
}
long delayNs;
long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {

View File

@ -75,7 +75,7 @@ class AsyncClientScanner {
private final long pauseNs;
private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;
private final int maxAttempts;
@ -90,7 +90,7 @@ class AsyncClientScanner {
private final Span span;
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
@ -104,7 +104,7 @@ class AsyncClientScanner {
this.conn = conn;
this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@ -198,7 +198,8 @@ class AsyncClientScanner {
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
try (Scope ignored = span.makeCurrent()) {
@ -232,7 +233,8 @@ class AsyncClientScanner {
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
}
}

View File

@ -30,7 +30,6 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
@ -52,6 +51,7 @@ import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFE
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -64,6 +64,23 @@ class AsyncConnectionConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
/**
* Parameter name for client pause when server is overloaded, denoted by
* {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded()}
*/
public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED =
"hbase.client.pause.server.overloaded";
static {
// This is added where the configs are referenced. It may be too late to happen before
// any user _sets_ the old cqtbe config onto a Configuration option. So we still need
// to handle checking both properties in parsing below. The benefit of calling this is
// that it should still cause Configuration to log a warning if we do end up falling
// through to the old deprecated config.
Configuration.addDeprecation(
HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED);
}
/**
* Configure the number of failures after which the client will start logging. A few failures
* is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
@ -92,7 +109,7 @@ class AsyncConnectionConfiguration {
private final long pauseNs;
private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;
private final int maxRetries;
@ -137,15 +154,17 @@ class AsyncConnectionConfiguration {
this.writeRpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs));
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
if (pauseForCQTBEMs < pauseMs) {
long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
if (pauseMsForServerOverloaded < pauseMs) {
LOG.warn(
"The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs);
pauseForCQTBEMs = pauseMs;
HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded,
HBASE_CLIENT_PAUSE, pauseMs);
pauseMsForServerOverloaded = pauseMs;
}
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseMsForServerOverloaded);
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
@ -196,8 +215,8 @@ class AsyncConnectionConfiguration {
return pauseNs;
}
long getPauseForCQTBENs() {
return pauseForCQTBENs;
long getPauseNsForServerOverloaded() {
return pauseNsForServerOverloaded;
}
int getMaxRetries() {

View File

@ -44,10 +44,10 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private final Callable<T> callable;
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.callable = callable;
}

View File

@ -31,8 +31,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
@InterfaceAudience.Private
@ -60,7 +59,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
private final long pauseNs;
private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;
private int tries = 1;
@ -81,13 +80,13 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller;
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.priority = priority;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@ -127,7 +126,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
}
private void tryScheduleRetry(Throwable error) {
long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
long pauseNsToUse = HBaseServerException.isServerOverloaded(error) ?
pauseNsForServerOverloaded : pauseNs;
long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;

View File

@ -58,7 +58,7 @@ class AsyncRpcRetryingCallerFactory {
protected long pauseNs = conn.connConf.getPauseNs();
protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs();
protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded();
protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
@ -119,8 +119,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public SingleRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(pause);
return this;
}
@ -156,8 +156,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncSingleRequestRpcRetryingCaller<T> build() {
preCheck();
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@ -263,8 +263,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(pause);
return this;
}
@ -292,8 +292,8 @@ class AsyncRpcRetryingCallerFactory {
preCheck();
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts,
scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@ -347,8 +347,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(pause);
return this;
}
@ -364,7 +364,8 @@ class AsyncRpcRetryingCallerFactory {
public <T> AsyncBatchRpcRetryingCaller<T> build() {
return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
}
public <T> List<CompletableFuture<T>> call() {
@ -406,8 +407,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public MasterRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(pause);
return this;
}
@ -438,7 +439,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncMasterRequestRpcRetryingCaller<T> build() {
preCheck();
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
}
/**
@ -487,8 +489,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public AdminRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(pause);
return this;
}
@ -514,8 +516,9 @@ class AsyncRpcRetryingCallerFactory {
public AsyncAdminRequestRetryingCaller<T> build() {
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
checkNotNull(callable, "action is null"));
}
public CompletableFuture<T> call() {
@ -558,8 +561,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public ServerRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(pause);
return this;
}
@ -579,7 +582,8 @@ class AsyncRpcRetryingCallerFactory {
}
public AsyncServerRequestRpcRetryingCaller<T> build() {
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, pauseForCQTBENs,
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs,
pauseNsForServerOverloaded,
maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
}

View File

@ -35,8 +35,8 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
@ -98,7 +98,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final long pauseNs;
private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;
private final int maxAttempts;
@ -307,7 +307,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
@ -320,7 +320,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.regionServerRemote = isRegionServerRemote;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@ -410,7 +410,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
return;
}
long delayNs;
long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
long pauseNsToUse = HBaseServerException.isServerOverloaded(error) ?
pauseNsForServerOverloaded : pauseNs;
if (scanTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {

View File

@ -46,10 +46,10 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private ServerName serverName;
public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseForCQTBENs, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseNsForServerOverloaded,
maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}

View File

@ -56,10 +56,10 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxAttempts,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.tableName = tableName;
this.row = row;
this.replicaId = replicaId;

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -76,21 +77,42 @@ public interface AsyncTableBuilder<C extends ScanResultConsumerBase> {
/**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
* @see #setRetryPauseForCQTBE(long, TimeUnit)
* @see #setRetryPauseForServerOverloaded(long, TimeUnit)
*/
AsyncTableBuilder<C> setRetryPause(long pause, TimeUnit unit);
/**
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
* exponential policy to generate sleep time when retrying.
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
* We use an exponential policy to generate sleep time when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
* means the server is overloaded. We just use the normal pause value for
* {@code CallQueueTooBigException} if here you specify a smaller value.
* {@link #setRetryPause(long, TimeUnit)} method, as usually
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
* specify a smaller value.
*
* @see #setRetryPause(long, TimeUnit)
* @deprecated Since 2.5.0, will be removed in 4.0.0. Please use
* {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead.
*/
@Deprecated
default AsyncTableBuilder<C> setRetryPauseForCQTBE(long pause, TimeUnit unit) {
return setRetryPauseForServerOverloaded(pause, unit);
}
/**
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
* We use an exponential policy to generate sleep time when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
* specify a smaller value.
*
* @see #setRetryPause(long, TimeUnit)
*/
AsyncTableBuilder<C> setRetryPauseForCQTBE(long pause, TimeUnit unit);
AsyncTableBuilder<C> setRetryPauseForServerOverloaded(long pause, TimeUnit unit);
/**
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.

View File

@ -45,7 +45,7 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
protected long pauseNs;
protected long pauseForCQTBENs;
protected long pauseNsForServerOverloaded;
protected int maxAttempts;
@ -60,7 +60,7 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();
this.maxAttempts = retries2Attempts(connConf.getMaxRetries());
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
@ -102,8 +102,8 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
}
@Override
public AsyncTableBuilderBase<C> setRetryPauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
public AsyncTableBuilderBase<C> setRetryPauseForServerOverloaded(long pause, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(pause);
return this;
}

View File

@ -62,7 +62,7 @@ public class ConnectionConfiguration {
// toggle for async/sync prefetch
private final boolean clientScannerAsyncPrefetch;
/**
/**
* Constructor
* @param conf Configuration object
*/
@ -206,5 +206,4 @@ public class ConnectionConfiguration {
public int getRpcTimeout() {
return rpcTimeout;
}
}

View File

@ -193,7 +193,8 @@ class ConnectionOverAsyncConnection implements Connection {
conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
.setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
.setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
.setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(),
.setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS)
.build(),
poolSupplier);
}
};

View File

@ -364,7 +364,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private final long pauseNs;
private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;
private final int maxAttempts;
@ -380,15 +380,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.operationTimeoutNs = builder.operationTimeoutNs;
this.pauseNs = builder.pauseNs;
if (builder.pauseForCQTBENs < builder.pauseNs) {
if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
LOG.warn(
"Configured value of pauseForCQTBENs is {} ms, which is less than" +
"Configured value of pauseNsForServerOverloaded is {} ms, which is less than" +
" the normal pause value {} ms, use the greater one instead",
TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
this.pauseForCQTBENs = builder.pauseNs;
this.pauseNsForServerOverloaded = builder.pauseNs;
} else {
this.pauseForCQTBENs = builder.pauseForCQTBENs;
this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
}
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
@ -399,7 +399,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return this.connection.callerFactory.<T> masterRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
@ -407,7 +408,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return this.connection.callerFactory.<T> adminRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
@ -3551,7 +3553,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return this.connection.callerFactory.<T> serverRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}

View File

@ -110,7 +110,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private final long pauseNs;
private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;
private final int maxAttempts;
@ -126,15 +126,15 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
this.operationTimeoutNs = builder.operationTimeoutNs;
this.scanTimeoutNs = builder.scanTimeoutNs;
this.pauseNs = builder.pauseNs;
if (builder.pauseForCQTBENs < builder.pauseNs) {
if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
LOG.warn(
"Configured value of pauseForCQTBENs is {} ms, which is less than" +
"Configured value of pauseNsForServerOverloaded is {} ms, which is less than" +
" the normal pause value {} ms, use the greater one instead",
TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
this.pauseForCQTBENs = builder.pauseNs;
this.pauseNsForServerOverloaded = builder.pauseNs;
} else {
this.pauseForCQTBENs = builder.pauseForCQTBENs;
this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
}
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
@ -204,7 +204,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
@ -618,7 +619,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt)
pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs,
startLogErrorsCnt)
.start();
}
@ -718,7 +720,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).call();
}

View File

@ -111,28 +111,6 @@ public final class ClientExceptionsUtil {
return null;
}
/**
* Checks if the exception is CallQueueTooBig exception (maybe wrapped
* into some RemoteException).
* @param t exception to check
* @return true if it's a CQTBE, false otherwise
*/
public static boolean isCallQueueTooBigException(Throwable t) {
t = findException(t);
return (t instanceof CallQueueTooBigException);
}
/**
* Checks if the exception is CallDroppedException (maybe wrapped
* into some RemoteException).
* @param t exception to check
* @return true if it's a CQTBE, false otherwise
*/
public static boolean isCallDroppedException(Throwable t) {
t = findException(t);
return (t instanceof CallDroppedException);
}
// This list covers most connectivity exceptions but not all.
// For example, in SocketOutputStream a plain IOException is thrown at times when the channel is
// closed.

View File

@ -40,14 +40,12 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@ -140,11 +138,13 @@ class IPCUtil {
static RemoteException createRemoteException(final ExceptionResponse e) {
String innerExceptionClassName = e.getExceptionClassName();
boolean doNotRetry = e.getDoNotRetry();
boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded();
return e.hasHostname() ?
// If a hostname then add it to the RemoteWithExtrasException
new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
e.getPort(), doNotRetry)
: new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
// If a hostname then add it to the RemoteWithExtrasException
new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
e.getPort(), doNotRetry, serverOverloaded) :
new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry,
serverOverloaded);
}
/**

View File

@ -25,6 +25,7 @@ import java.security.PrivilegedAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.util.DynamicClassLoader;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
@ -41,6 +42,7 @@ public class RemoteWithExtrasException extends RemoteException {
private final String hostname;
private final int port;
private final boolean doNotRetry;
private final boolean serverOverloaded;
/**
* Dynamic class loader to load filter/comparators
@ -58,15 +60,26 @@ public class RemoteWithExtrasException extends RemoteException {
}
public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry) {
this(className, msg, null, -1, doNotRetry);
this(className, msg, doNotRetry, false);
}
public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry,
final boolean serverOverloaded) {
this(className, msg, null, -1, doNotRetry, serverOverloaded);
}
public RemoteWithExtrasException(String className, String msg, final String hostname,
final int port, final boolean doNotRetry) {
final int port, final boolean doNotRetry) {
this(className, msg, hostname, port, doNotRetry, false);
}
public RemoteWithExtrasException(String className, String msg, final String hostname,
final int port, final boolean doNotRetry, final boolean serverOverloaded) {
super(className, msg);
this.hostname = hostname;
this.port = port;
this.doNotRetry = doNotRetry;
this.serverOverloaded = serverOverloaded;
}
@Override
@ -98,6 +111,17 @@ public class RemoteWithExtrasException extends RemoteException {
cn.setAccessible(true);
IOException ex = cn.newInstance(this.getMessage());
ex.initCause(this);
if (ex instanceof HBaseServerException) {
// this is a newly constructed exception.
// if an exception defaults to meaning isServerOverloaded, we use that.
// otherwise, see if the remote exception value should mean setting to true.
HBaseServerException serverException = (HBaseServerException) ex;
if (serverOverloaded && !serverException.isServerOverloaded()) {
serverException.setServerOverloaded(true);
}
}
return ex;
}
@ -121,4 +145,11 @@ public class RemoteWithExtrasException extends RemoteException {
public boolean isDoNotRetry() {
return this.doNotRetry;
}
/**
* @return True if the server was considered overloaded when the exception was thrown.
*/
public boolean isServerOverloaded() {
return serverOverloaded;
}
}

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -40,6 +40,23 @@ public class TestAsyncConnectionConfiguration {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncConnectionConfiguration.class);
@Test
public void itHandlesDeprecatedPauseForCQTBE() {
Configuration conf = new Configuration();
long timeoutMs = 1000;
conf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, timeoutMs);
AsyncConnectionConfiguration config = new AsyncConnectionConfiguration(conf);
assertTrue(Configuration.isDeprecated(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE));
long expected = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
assertEquals(expected, config.getPauseNsForServerOverloaded());
conf = new Configuration();
conf.setLong(AsyncConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, timeoutMs);
config = new AsyncConnectionConfiguration(conf);
assertEquals(expected, config.getPauseNsForServerOverloaded());
}
@Test
public void testDefaultReadWriteRpcTimeout() {
Configuration conf = HBaseConfiguration.create();

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ClientTests.class, SmallTests.class })
public class TestRemoteWithExtrasException {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRemoteWithExtrasException.class);
/**
* test verifies that we honor the inherent value of an exception for isServerOverloaded.
* We don't want a false value passed into RemoteWithExtrasExceptions to override the
* inherent value of an exception if it's already true. This could be due to an out of date
* server not sending the proto field we expect.
*/
@Test
public void itUsesExceptionDefaultValueForServerOverloaded() {
// pass false for server overloaded, we still expect the exception to be true due to
// the exception type
RemoteWithExtrasException ex =
new RemoteWithExtrasException(ServerOverloadedException.class.getName(),
"server is overloaded", false, false);
IOException result = ex.unwrapRemoteException();
assertEquals(result.getClass(), ServerOverloadedException.class);
assertTrue(((ServerOverloadedException) result).isServerOverloaded());
}
@Test
public void itUsesPassedServerOverloadedValue() {
String exceptionClass = HBaseServerException.class.getName();
String message = "server is overloaded";
RemoteWithExtrasException ex =
new RemoteWithExtrasException(exceptionClass, message, false, false);
IOException result = ex.unwrapRemoteException();
assertTrue(result instanceof HBaseServerException);
assertFalse(((HBaseServerException) result).isServerOverloaded());
// run again with true value passed in
ex = new RemoteWithExtrasException(exceptionClass, message, false, true);
result = ex.unwrapRemoteException();
assertTrue(result instanceof HBaseServerException);
// expect true this time
assertTrue(((HBaseServerException) result).isServerOverloaded());
}
private static class ServerOverloadedException extends HBaseServerException {
public ServerOverloadedException(String message) {
super(true, message);
}
}
}

View File

@ -798,7 +798,10 @@ public final class HConstants {
/**
* Parameter name for client pause value for special case such as call queue too big, etc.
* @deprecated Since 2.5.0, will be removed in 4.0.0. Please use
* hbase.client.pause.server.overloaded instead.
*/
@Deprecated
public static final String HBASE_CLIENT_PAUSE_FOR_CQTBE = "hbase.client.pause.cqtbe";
/**

View File

@ -493,12 +493,14 @@ possible configurations would overwhelm and obscure the important.
this initial pause amount and how this pause works w/ retries.</description>
</property>
<property>
<name>hbase.client.pause.cqtbe</name>
<name>hbase.client.pause.server.overloaded</name>
<value></value>
<description>Whether or not to use a special client pause for
CallQueueTooBigException (cqtbe). Set this property to a higher value
than hbase.client.pause if you observe frequent CQTBE from the same
RegionServer and the call queue there keeps full</description>
<description>Pause time when encountering an exception indicating a
server is overloaded, CallQueueTooBigException or CallDroppedException.
Set this property to a higher value than hbase.client.pause if you
observe frequent CallQueueTooBigException or CallDroppedException from the same
RegionServer and the call queue there keeps filling up. This config used to be
called hbase.client.pause.cqtbe, which has been deprecated as of 2.5.0.</description>
</property>
<property>
<name>hbase.client.retries.number</name>

View File

@ -119,6 +119,8 @@ message ExceptionResponse {
optional int32 port = 4;
// Set if we are NOT to retry on receipt of this exception
optional bool do_not_retry = 5;
// Set true if the server was considered to be overloaded when exception was thrown
optional bool server_overloaded = 6;
}
/**

View File

@ -53,7 +53,7 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
RegionInfo replica, List<Entry> entries) {
super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()),
conn.connConf.getPauseNs(), conn.connConf.getPauseForCQTBENs(), maxAttempts,
conn.connConf.getPauseNs(), conn.connConf.getPauseNsForServerOverloaded(), maxAttempts,
operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
this.replica = replica;
this.entries = entries.toArray(new Entry[0]);

View File

@ -29,6 +29,7 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
@ -320,6 +321,9 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
RegionMovedException rme = (RegionMovedException)t;
exceptionBuilder.setHostname(rme.getHostname());
exceptionBuilder.setPort(rme.getPort());
} else if (t instanceof HBaseServerException) {
HBaseServerException hse = (HBaseServerException) t;
exceptionBuilder.setServerOverloaded(hse.isServerOverloaded());
}
// Set the exception as the result of the method invocation.
headerBuilder.setException(exceptionBuilder.build());

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -36,9 +37,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
@ -56,31 +59,46 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncClientPauseForCallQueueTooBig {
public class TestAsyncClientPauseForServerOverloaded {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class);
HBaseClassTestRule.forClass(TestAsyncClientPauseForServerOverloaded.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static TableName TABLE_NAME = TableName.valueOf("CQTBE");
private static TableName TABLE_NAME = TableName.valueOf("ServerOverloaded");
private static byte[] FAMILY = Bytes.toBytes("Family");
private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1);
private static long PAUSE_FOR_SERVER_OVERLOADED_NANOS = TimeUnit.SECONDS.toNanos(1);
private static long PAUSE_FOR_SERVER_OVERLOADED_MILLIS = TimeUnit.NANOSECONDS.toMillis(
PAUSE_FOR_SERVER_OVERLOADED_NANOS);
private static AsyncConnection CONN;
private static boolean FAIL = false;
private static volatile FailMode MODE = null;
private static ConcurrentMap<MethodDescriptor, AtomicInteger> INVOKED = new ConcurrentHashMap<>();
enum FailMode {
CALL_QUEUE_TOO_BIG,
CALL_DROPPED;
public static final class CQTBERpcScheduler extends SimpleRpcScheduler {
private ConcurrentMap<MethodDescriptor, AtomicInteger> invoked = new ConcurrentHashMap<>();
public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
// this is for test scan, where we will send a open scanner first and then a next, and we
// expect that we hit CQTBE two times.
private boolean shouldFail(CallRunner callRunner) {
MethodDescriptor method = callRunner.getRpcCall().getMethod();
return invoked.computeIfAbsent(method,
k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0;
}
}
public static final class OverloadedRpcScheduler extends SimpleRpcScheduler {
public OverloadedRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
Abortable server, int highPriorityLevel) {
super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
@ -89,25 +107,36 @@ public class TestAsyncClientPauseForCallQueueTooBig {
@Override
public boolean dispatch(CallRunner callTask) {
if (FAIL) {
MethodDescriptor method = callTask.getRpcCall().getMethod();
// this is for test scan, where we will send a open scanner first and then a next, and we
// expect that we hit CQTBE two times.
if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) {
return false;
}
if (MODE == FailMode.CALL_QUEUE_TOO_BIG && MODE.shouldFail(callTask)) {
return false;
}
return super.dispatch(callTask);
}
}
public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory {
public static final class OverloadedQueue extends TestPluggableQueueImpl {
public OverloadedQueue(int maxQueueLength, PriorityFunction priority, Configuration conf) {
super(maxQueueLength, priority, conf);
}
@Override
public boolean offer(CallRunner callRunner) {
if (MODE == FailMode.CALL_DROPPED && MODE.shouldFail(callRunner)) {
callRunner.drop();
return true;
}
return super.offer(callRunner);
}
}
public static final class OverloadedRpcSchedulerFactory extends SimpleRpcSchedulerFactory {
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new CQTBERpcScheduler(conf, handlerCount,
return new OverloadedRpcScheduler(conf, handlerCount,
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
@ -122,12 +151,16 @@ public class TestAsyncClientPauseForCallQueueTooBig {
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS));
UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable");
UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name",
OverloadedQueue.class, PluggableBlockingQueue.class);
UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class);
OverloadedRpcSchedulerFactory.class, RpcSchedulerFactory.class);
UTIL.startMiniCluster(1);
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
Configuration conf = new Configuration(UTIL.getConfiguration());
conf.setLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, PAUSE_FOR_SERVER_OVERLOADED_MILLIS);
CONN = ConnectionFactory.createAsyncConnection(conf).get();
}
@AfterClass
@ -143,22 +176,28 @@ public class TestAsyncClientPauseForCallQueueTooBig {
table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
}
}
FAIL = true;
MODE = FailMode.CALL_QUEUE_TOO_BIG;
}
@After
public void tearDownAfterTest() throws IOException {
FAIL = false;
INVOKED.clear();
for (FailMode mode : FailMode.values()) {
mode.invoked.clear();
}
MODE = null;
UTIL.getAdmin().disableTable(TABLE_NAME);
UTIL.getAdmin().deleteTable(TABLE_NAME);
}
private void assertTime(Callable<Void> callable, long time) throws Exception {
long startNs = System.nanoTime();
callable.call();
long costNs = System.nanoTime() - startNs;
assertTrue(costNs > time);
for (FailMode mode : FailMode.values()) {
MODE = mode;
long startNs = System.nanoTime();
callable.call();
long costNs = System.nanoTime() - startNs;
assertTrue(costNs > time);
}
}
@Test
@ -167,7 +206,7 @@ public class TestAsyncClientPauseForCallQueueTooBig {
Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get();
assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER));
return null;
}, PAUSE_FOR_CQTBE_NS);
}, PAUSE_FOR_SERVER_OVERLOADED_NANOS);
}
@Test
@ -181,7 +220,7 @@ public class TestAsyncClientPauseForCallQueueTooBig {
}
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
}, PAUSE_FOR_CQTBE_NS);
}, PAUSE_FOR_SERVER_OVERLOADED_NANOS);
}
@Test
@ -197,6 +236,6 @@ public class TestAsyncClientPauseForCallQueueTooBig {
assertNull(scanner.next());
}
return null;
}, PAUSE_FOR_CQTBE_NS * 2);
}, PAUSE_FOR_SERVER_OVERLOADED_NANOS * 2);
}
}