HBASE-26807 Unify CallQueueTooBigException special pause with CallDroppedException (#4273)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
parent
edbe44b0d1
commit
2240025349
|
@ -18,8 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -28,14 +26,16 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@SuppressWarnings("serial")
|
||||
@InterfaceAudience.Public
|
||||
public class CallDroppedException extends IOException {
|
||||
public class CallDroppedException extends HBaseServerException {
|
||||
public CallDroppedException() {
|
||||
super();
|
||||
// For now all call drops are due to server being overloaded.
|
||||
// We could decouple this if desired.
|
||||
super(true);
|
||||
}
|
||||
|
||||
// Absence of this constructor prevents proper unwrapping of
|
||||
// remote exception on the client side
|
||||
public CallDroppedException(String message) {
|
||||
super(message);
|
||||
super(true, message);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,13 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Returned to clients when their request was dropped because the call queue was too big to
|
||||
* accept a new call. Clients should retry upon receiving it.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
@InterfaceAudience.Public
|
||||
public class CallQueueTooBigException extends IOException {
|
||||
public class CallQueueTooBigException extends CallDroppedException {
|
||||
public CallQueueTooBigException() {
|
||||
super();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Base class for exceptions thrown by an HBase server. May contain extra info about
|
||||
* the state of the server when the exception was thrown.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class HBaseServerException extends HBaseIOException {
|
||||
private boolean serverOverloaded;
|
||||
|
||||
public HBaseServerException() {
|
||||
this(false);
|
||||
}
|
||||
|
||||
public HBaseServerException(String message) {
|
||||
this(false, message);
|
||||
}
|
||||
|
||||
public HBaseServerException(boolean serverOverloaded) {
|
||||
this.serverOverloaded = serverOverloaded;
|
||||
}
|
||||
|
||||
public HBaseServerException(boolean serverOverloaded, String message) {
|
||||
super(message);
|
||||
this.serverOverloaded = serverOverloaded;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param t throwable to check for server overloaded state
|
||||
* @return True if the server was considered overloaded when the exception was thrown
|
||||
*/
|
||||
public static boolean isServerOverloaded(Throwable t) {
|
||||
if (t instanceof HBaseServerException) {
|
||||
return ((HBaseServerException) t).isServerOverloaded();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Necessary for parsing RemoteException on client side
|
||||
* @param serverOverloaded True if server was overloaded when exception was thrown
|
||||
*/
|
||||
public void setServerOverloaded(boolean serverOverloaded) {
|
||||
this.serverOverloaded = serverOverloaded;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if server was considered overloaded when exception was thrown
|
||||
*/
|
||||
public boolean isServerOverloaded() {
|
||||
return serverOverloaded;
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -50,21 +51,42 @@ public interface AsyncAdminBuilder {
|
|||
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
|
||||
* retrying.
|
||||
* @return this for invocation chaining
|
||||
* @see #setRetryPauseForCQTBE(long, TimeUnit)
|
||||
* @see #setRetryPauseForServerOverloaded(long, TimeUnit)
|
||||
*/
|
||||
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
|
||||
* exponential policy to generate sleep time when retrying.
|
||||
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
|
||||
* We use an exponential policy to generate sleep time from this base when retrying.
|
||||
* <p/>
|
||||
* This value should be greater than the normal pause value which could be set with the above
|
||||
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
|
||||
* means the server is overloaded. We just use the normal pause value for
|
||||
* {@code CallQueueTooBigException} if here you specify a smaller value.
|
||||
* {@link #setRetryPause(long, TimeUnit)} method, as usually
|
||||
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
|
||||
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
|
||||
* specify a smaller value.
|
||||
*
|
||||
* @see #setRetryPause(long, TimeUnit)
|
||||
* @deprecated Since 2.5.0, will be removed in 4.0.0. Please use
|
||||
* {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) {
|
||||
return setRetryPauseForServerOverloaded(pause, unit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
|
||||
* We use an exponential policy to generate sleep time when retrying.
|
||||
* <p/>
|
||||
* This value should be greater than the normal pause value which could be set with the above
|
||||
* {@link #setRetryPause(long, TimeUnit)} method, as usually
|
||||
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
|
||||
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
|
||||
* specify a smaller value.
|
||||
*
|
||||
* @see #setRetryPause(long, TimeUnit)
|
||||
*/
|
||||
AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
|
||||
AsyncAdminBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.
|
||||
|
|
|
@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
|
|||
|
||||
protected long pauseNs;
|
||||
|
||||
protected long pauseForCQTBENs;
|
||||
protected long pauseNsForServerOverloaded;
|
||||
|
||||
protected int maxAttempts;
|
||||
|
||||
|
@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
|
|||
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
|
||||
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
|
||||
this.pauseNs = connConf.getPauseNs();
|
||||
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
|
||||
this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();
|
||||
this.maxAttempts = connConf.getMaxRetries();
|
||||
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
|
||||
}
|
||||
|
@ -67,8 +67,8 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
|
||||
this.pauseForCQTBENs = unit.toNanos(timeout);
|
||||
public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) {
|
||||
this.pauseNsForServerOverloaded = unit.toNanos(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,10 +44,10 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
|
|||
private ServerName serverName;
|
||||
|
||||
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
|
||||
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
|
||||
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
|
||||
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
|
||||
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
|
||||
rpcTimeoutNs, startLogErrorsCnt);
|
||||
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
|
||||
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
|
||||
this.serverName = serverName;
|
||||
this.callable = callable;
|
||||
}
|
||||
|
|
|
@ -45,9 +45,9 @@ import java.util.function.Supplier;
|
|||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.commons.lang3.mutable.MutableBoolean;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.CellScannable;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RetryImmediatelyException;
|
||||
|
@ -63,9 +63,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timer;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
|
@ -104,7 +102,7 @@ class AsyncBatchRpcRetryingCaller<T> {
|
|||
|
||||
private final long pauseNs;
|
||||
|
||||
private final long pauseForCQTBENs;
|
||||
private final long pauseNsForServerOverloaded;
|
||||
|
||||
private final int maxAttempts;
|
||||
|
||||
|
@ -150,13 +148,14 @@ class AsyncBatchRpcRetryingCaller<T> {
|
|||
}
|
||||
|
||||
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
|
||||
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
|
||||
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
TableName tableName, List<? extends Row> actions, long pauseNs,
|
||||
long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
|
||||
long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
this.retryTimer = retryTimer;
|
||||
this.conn = conn;
|
||||
this.tableName = tableName;
|
||||
this.pauseNs = pauseNs;
|
||||
this.pauseForCQTBENs = pauseForCQTBENs;
|
||||
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
|
||||
this.maxAttempts = maxAttempts;
|
||||
this.operationTimeoutNs = operationTimeoutNs;
|
||||
this.rpcTimeoutNs = rpcTimeoutNs;
|
||||
|
@ -466,17 +465,17 @@ class AsyncBatchRpcRetryingCaller<T> {
|
|||
.collect(Collectors.toList());
|
||||
addError(copiedActions, error, serverName);
|
||||
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
|
||||
error instanceof CallQueueTooBigException);
|
||||
HBaseServerException.isServerOverloaded(error));
|
||||
}
|
||||
|
||||
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
|
||||
boolean isCallQueueTooBig) {
|
||||
boolean isServerOverloaded) {
|
||||
if (immediately) {
|
||||
groupAndSend(actions, tries);
|
||||
return;
|
||||
}
|
||||
long delayNs;
|
||||
long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
|
||||
long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
|
||||
if (operationTimeoutNs > 0) {
|
||||
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
|
||||
if (maxDelayNs <= 0) {
|
||||
|
|
|
@ -78,7 +78,7 @@ class AsyncClientScanner {
|
|||
|
||||
private final long pauseNs;
|
||||
|
||||
private final long pauseForCQTBENs;
|
||||
private final long pauseNsForServerOverloaded;
|
||||
|
||||
private final int maxAttempts;
|
||||
|
||||
|
@ -93,7 +93,7 @@ class AsyncClientScanner {
|
|||
private final Span span;
|
||||
|
||||
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
|
||||
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
|
||||
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
|
||||
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
if (scan.getStartRow() == null) {
|
||||
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
|
||||
|
@ -107,7 +107,7 @@ class AsyncClientScanner {
|
|||
this.conn = conn;
|
||||
this.retryTimer = retryTimer;
|
||||
this.pauseNs = pauseNs;
|
||||
this.pauseForCQTBENs = pauseForCQTBENs;
|
||||
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
|
||||
this.maxAttempts = maxAttempts;
|
||||
this.scanTimeoutNs = scanTimeoutNs;
|
||||
this.rpcTimeoutNs = rpcTimeoutNs;
|
||||
|
@ -198,7 +198,8 @@ class AsyncClientScanner {
|
|||
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
|
||||
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
|
||||
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
|
||||
.maxAttempts(maxAttempts)
|
||||
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
|
||||
(hasMore, error) -> {
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
|
@ -231,7 +232,8 @@ class AsyncClientScanner {
|
|||
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
|
||||
.priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS)
|
||||
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
|
||||
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
|
||||
.action(this::callOpenScanner).call();
|
||||
}
|
||||
|
|
|
@ -17,46 +17,15 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
|
||||
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Timeout configs.
|
||||
|
@ -64,7 +33,15 @@ import org.slf4j.LoggerFactory;
|
|||
@InterfaceAudience.Private
|
||||
class AsyncConnectionConfiguration {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
|
||||
/**
|
||||
* Configure the number of failures after which the client will start logging. A few failures
|
||||
* is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
|
||||
* heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at
|
||||
* this stage.
|
||||
*/
|
||||
public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
|
||||
"hbase.client.start.log.errors.counter";
|
||||
public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
|
||||
|
||||
private final long metaOperationTimeoutNs;
|
||||
|
||||
|
@ -84,7 +61,7 @@ class AsyncConnectionConfiguration {
|
|||
|
||||
private final long pauseNs;
|
||||
|
||||
private final long pauseForCQTBENs;
|
||||
private final long pauseNsForServerOverloaded;
|
||||
|
||||
private final int maxRetries;
|
||||
|
||||
|
@ -118,50 +95,45 @@ class AsyncConnectionConfiguration {
|
|||
private final int maxKeyValueSize;
|
||||
|
||||
AsyncConnectionConfiguration(Configuration conf) {
|
||||
ConnectionConfiguration connectionConf = new ConnectionConfiguration(conf);
|
||||
|
||||
// fields we can pull directly from connection configuration
|
||||
this.scannerCaching = connectionConf.getScannerCaching();
|
||||
this.scannerMaxResultSize = connectionConf.getScannerMaxResultSize();
|
||||
this.writeBufferSize = connectionConf.getWriteBufferSize();
|
||||
this.writeBufferPeriodicFlushTimeoutNs = connectionConf.getWriteBufferPeriodicFlushTimeoutMs();
|
||||
this.maxKeyValueSize = connectionConf.getMaxKeyValueSize();
|
||||
this.maxRetries = connectionConf.getRetriesNumber();
|
||||
|
||||
// fields from connection configuration that need to be converted to nanos
|
||||
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
|
||||
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
|
||||
long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
|
||||
this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs);
|
||||
connectionConf.getMetaOperationTimeout());
|
||||
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getOperationTimeout());
|
||||
this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getRpcTimeout());
|
||||
this.readRpcTimeoutNs =
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs));
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY,
|
||||
connectionConf.getReadRpcTimeout()));
|
||||
this.writeRpcTimeoutNs =
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs));
|
||||
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
|
||||
if (pauseForCQTBEMs < pauseMs) {
|
||||
LOG.warn(
|
||||
"The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
|
||||
HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs);
|
||||
pauseForCQTBEMs = pauseMs;
|
||||
}
|
||||
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
|
||||
this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
|
||||
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||
this.startLogErrorsCnt =
|
||||
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
connectionConf.getWriteRpcTimeout()));
|
||||
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getPauseMillis());
|
||||
this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(
|
||||
connectionConf.getPauseMillisForServerOverloaded());
|
||||
this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
|
||||
connectionConf.getPrimaryCallTimeoutMicroSecond());
|
||||
this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
|
||||
connectionConf.getReplicaCallTimeoutMicroSecondScan());
|
||||
this.primaryMetaScanTimeoutNs =
|
||||
TimeUnit.MICROSECONDS.toNanos(connectionConf.getMetaReplicaCallTimeoutMicroSecondScan());
|
||||
this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
|
||||
this.scannerCaching =
|
||||
conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
|
||||
|
||||
// fields not in connection configuration
|
||||
this.startLogErrorsCnt =
|
||||
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||
this.metaScannerCaching =
|
||||
conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
|
||||
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
|
||||
this.writeBufferPeriodicFlushTimeoutNs =
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
|
||||
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
|
||||
this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
|
||||
conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
|
||||
this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
|
||||
conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
|
||||
this.primaryMetaScanTimeoutNs =
|
||||
TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
|
||||
HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
|
||||
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
|
||||
}
|
||||
|
||||
long getMetaOperationTimeoutNs() {
|
||||
|
@ -188,8 +160,8 @@ class AsyncConnectionConfiguration {
|
|||
return pauseNs;
|
||||
}
|
||||
|
||||
long getPauseForCQTBENs() {
|
||||
return pauseForCQTBENs;
|
||||
long getPauseNsForServerOverloaded() {
|
||||
return pauseNsForServerOverloaded;
|
||||
}
|
||||
|
||||
int getMaxRetries() {
|
||||
|
|
|
@ -44,10 +44,10 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
|
|||
private final Callable<T> callable;
|
||||
|
||||
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
|
||||
Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries,
|
||||
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs,
|
||||
rpcTimeoutNs, startLogErrorsCnt);
|
||||
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
|
||||
int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries,
|
||||
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
|
||||
this.callable = callable;
|
||||
}
|
||||
|
||||
|
|
|
@ -134,14 +134,13 @@ class AsyncProcess {
|
|||
final long id;
|
||||
|
||||
final ClusterConnection connection;
|
||||
final ConnectionConfiguration connectionConfiguration;
|
||||
private final RpcRetryingCallerFactory rpcCallerFactory;
|
||||
final RpcControllerFactory rpcFactory;
|
||||
|
||||
// Start configuration settings.
|
||||
final int startLogErrorsCnt;
|
||||
|
||||
final long pause;
|
||||
final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
|
||||
final int numTries;
|
||||
long serverTrackerTimeout;
|
||||
final long primaryCallTimeoutMicroseconds;
|
||||
|
@ -156,30 +155,25 @@ class AsyncProcess {
|
|||
public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms";
|
||||
private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
|
||||
private final int periodToLog;
|
||||
|
||||
AsyncProcess(ClusterConnection hc, Configuration conf,
|
||||
RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
|
||||
this(hc, conf, rpcCaller, rpcFactory, hc.getConnectionConfiguration().getRetriesNumber());
|
||||
}
|
||||
|
||||
AsyncProcess(ClusterConnection hc, Configuration conf,
|
||||
RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory, int retriesNumber) {
|
||||
if (hc == null) {
|
||||
throw new IllegalArgumentException("ClusterConnection cannot be null.");
|
||||
}
|
||||
|
||||
this.connection = hc;
|
||||
this.connectionConfiguration = connection.getConnectionConfiguration();
|
||||
|
||||
this.id = COUNTER.incrementAndGet();
|
||||
|
||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
|
||||
if (configuredPauseForCQTBE < pause) {
|
||||
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
|
||||
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
|
||||
+ ", will use " + pause + " instead.");
|
||||
this.pauseForCQTBE = pause;
|
||||
} else {
|
||||
this.pauseForCQTBE = configuredPauseForCQTBE;
|
||||
}
|
||||
// how many times we could try in total, one more than retry number
|
||||
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
|
||||
this.numTries = retriesNumber + 1;
|
||||
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
|
||||
this.startLogErrorsCnt =
|
||||
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||
|
@ -193,7 +187,8 @@ class AsyncProcess {
|
|||
// we will do more retries in aggregate, but the user will be none the wiser.
|
||||
this.serverTrackerTimeout = 0L;
|
||||
for (int i = 0; i < this.numTries; ++i) {
|
||||
serverTrackerTimeout = serverTrackerTimeout + ConnectionUtils.getPauseTime(this.pause, i);
|
||||
serverTrackerTimeout = serverTrackerTimeout
|
||||
+ ConnectionUtils.getPauseTime(connectionConfiguration.getPauseMillis(), i);
|
||||
}
|
||||
|
||||
this.rpcCallerFactory = rpcCaller;
|
||||
|
|
|
@ -36,8 +36,8 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
|
@ -738,11 +738,14 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
long backOffTime;
|
||||
if (retryImmediately) {
|
||||
backOffTime = 0;
|
||||
} else if (throwable instanceof CallQueueTooBigException) {
|
||||
// Give a special check on CQTBE, see #HBASE-17114
|
||||
backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE);
|
||||
} else if (HBaseServerException.isServerOverloaded(throwable)) {
|
||||
// Give a special check when encountering an exception indicating the server is overloaded.
|
||||
// see #HBASE-17114 and HBASE-26807
|
||||
backOffTime = errorsByServer.calculateBackoffTime(oldServer,
|
||||
asyncProcess.connectionConfiguration.getPauseMillisForServerOverloaded());
|
||||
} else {
|
||||
backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause);
|
||||
backOffTime = errorsByServer.calculateBackoffTime(oldServer,
|
||||
asyncProcess.connectionConfiguration.getPauseMillis());
|
||||
}
|
||||
if (numAttempt > asyncProcess.startLogErrorsCnt) {
|
||||
// We use this value to have some logs when we have multiple failures, but not too many
|
||||
|
|
|
@ -31,8 +31,8 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.util.FutureUtils;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timer;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -60,7 +59,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
|
|||
|
||||
private final long pauseNs;
|
||||
|
||||
private final long pauseForCQTBENs;
|
||||
private final long pauseNsForServerOverloaded;
|
||||
|
||||
private int tries = 1;
|
||||
|
||||
|
@ -81,13 +80,13 @@ public abstract class AsyncRpcRetryingCaller<T> {
|
|||
protected final HBaseRpcController controller;
|
||||
|
||||
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
|
||||
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
|
||||
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
|
||||
long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
this.retryTimer = retryTimer;
|
||||
this.conn = conn;
|
||||
this.priority = priority;
|
||||
this.pauseNs = pauseNs;
|
||||
this.pauseForCQTBENs = pauseForCQTBENs;
|
||||
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
|
||||
this.maxAttempts = maxAttempts;
|
||||
this.operationTimeoutNs = operationTimeoutNs;
|
||||
this.rpcTimeoutNs = rpcTimeoutNs;
|
||||
|
@ -127,7 +126,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
|
|||
}
|
||||
|
||||
private void tryScheduleRetry(Throwable error) {
|
||||
long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
|
||||
long pauseNsToUse = HBaseServerException.isServerOverloaded(error) ?
|
||||
pauseNsForServerOverloaded : pauseNs;
|
||||
long delayNs;
|
||||
if (operationTimeoutNs > 0) {
|
||||
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
|
||||
|
|
|
@ -58,7 +58,7 @@ class AsyncRpcRetryingCallerFactory {
|
|||
|
||||
protected long pauseNs = conn.connConf.getPauseNs();
|
||||
|
||||
protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs();
|
||||
protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded();
|
||||
|
||||
protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
|
||||
|
||||
|
@ -119,8 +119,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
return this;
|
||||
}
|
||||
|
||||
public SingleRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
|
||||
this.pauseForCQTBENs = unit.toNanos(pause);
|
||||
public SingleRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
|
||||
this.pauseNsForServerOverloaded = unit.toNanos(pause);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -156,8 +156,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
public AsyncSingleRequestRpcRetryingCaller<T> build() {
|
||||
preCheck();
|
||||
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
|
||||
locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
|
||||
rpcTimeoutNs, startLogErrorsCnt);
|
||||
locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
|
||||
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -263,8 +263,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
|
||||
this.pauseForCQTBENs = unit.toNanos(pause);
|
||||
public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
|
||||
this.pauseNsForServerOverloaded = unit.toNanos(pause);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -292,8 +292,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
preCheck();
|
||||
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
|
||||
scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
|
||||
scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs,
|
||||
rpcTimeoutNs, startLogErrorsCnt);
|
||||
scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts,
|
||||
scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -347,8 +347,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
return this;
|
||||
}
|
||||
|
||||
public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
|
||||
this.pauseForCQTBENs = unit.toNanos(pause);
|
||||
public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
|
||||
this.pauseNsForServerOverloaded = unit.toNanos(pause);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -364,7 +364,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
|
||||
public <T> AsyncBatchRpcRetryingCaller<T> build() {
|
||||
return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
|
||||
pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
|
||||
pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
|
||||
startLogErrorsCnt);
|
||||
}
|
||||
|
||||
public <T> List<CompletableFuture<T>> call() {
|
||||
|
@ -406,8 +407,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
return this;
|
||||
}
|
||||
|
||||
public MasterRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
|
||||
this.pauseForCQTBENs = unit.toNanos(pause);
|
||||
public MasterRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
|
||||
this.pauseNsForServerOverloaded = unit.toNanos(pause);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -438,7 +439,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
public AsyncMasterRequestRpcRetryingCaller<T> build() {
|
||||
preCheck();
|
||||
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
|
||||
pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
|
||||
pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
|
||||
startLogErrorsCnt);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -487,8 +489,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
return this;
|
||||
}
|
||||
|
||||
public AdminRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
|
||||
this.pauseForCQTBENs = unit.toNanos(pause);
|
||||
public AdminRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
|
||||
this.pauseNsForServerOverloaded = unit.toNanos(pause);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -514,8 +516,9 @@ class AsyncRpcRetryingCallerFactory {
|
|||
|
||||
public AsyncAdminRequestRetryingCaller<T> build() {
|
||||
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
|
||||
pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
|
||||
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
|
||||
pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
|
||||
startLogErrorsCnt, checkNotNull(serverName, "serverName is null"),
|
||||
checkNotNull(callable, "action is null"));
|
||||
}
|
||||
|
||||
public CompletableFuture<T> call() {
|
||||
|
@ -558,8 +561,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ServerRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
|
||||
this.pauseForCQTBENs = unit.toNanos(pause);
|
||||
public ServerRequestCallerBuilder<T> pauseForServerOverloaded(long pause, TimeUnit unit) {
|
||||
this.pauseNsForServerOverloaded = unit.toNanos(pause);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -579,7 +582,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
}
|
||||
|
||||
public AsyncServerRequestRpcRetryingCaller<T> build() {
|
||||
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, pauseForCQTBENs,
|
||||
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs,
|
||||
pauseNsForServerOverloaded,
|
||||
maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
|
||||
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
|
||||
}
|
||||
|
|
|
@ -36,8 +36,8 @@ import java.util.List;
|
|||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
|
@ -101,7 +101,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
|
||||
private final long pauseNs;
|
||||
|
||||
private final long pauseForCQTBENs;
|
||||
private final long pauseNsForServerOverloaded;
|
||||
|
||||
private final int maxAttempts;
|
||||
|
||||
|
@ -310,7 +310,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
|
||||
AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
|
||||
boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
|
||||
long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
|
||||
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
|
||||
int startLogErrorsCnt) {
|
||||
this.retryTimer = retryTimer;
|
||||
this.scan = scan;
|
||||
|
@ -323,7 +323,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
this.regionServerRemote = isRegionServerRemote;
|
||||
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
|
||||
this.pauseNs = pauseNs;
|
||||
this.pauseForCQTBENs = pauseForCQTBENs;
|
||||
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
|
||||
this.maxAttempts = maxAttempts;
|
||||
this.scanTimeoutNs = scanTimeoutNs;
|
||||
this.rpcTimeoutNs = rpcTimeoutNs;
|
||||
|
@ -413,7 +413,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
return;
|
||||
}
|
||||
long delayNs;
|
||||
long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
|
||||
long pauseNsToUse = HBaseServerException.isServerOverloaded(error) ?
|
||||
pauseNsForServerOverloaded : pauseNs;
|
||||
if (scanTimeoutNs > 0) {
|
||||
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
|
||||
if (maxDelayNs <= 0) {
|
||||
|
|
|
@ -46,10 +46,10 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
|
|||
private ServerName serverName;
|
||||
|
||||
public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
|
||||
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
|
||||
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
|
||||
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
|
||||
super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseForCQTBENs, maxAttempts,
|
||||
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
|
||||
super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseNsForServerOverloaded,
|
||||
maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
|
||||
this.serverName = serverName;
|
||||
this.callable = callable;
|
||||
}
|
||||
|
|
|
@ -56,10 +56,10 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
|
|||
|
||||
public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
|
||||
TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
|
||||
Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxAttempts,
|
||||
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
|
||||
rpcTimeoutNs, startLogErrorsCnt);
|
||||
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
|
||||
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
|
||||
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
|
||||
this.tableName = tableName;
|
||||
this.row = row;
|
||||
this.replicaId = replicaId;
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -76,21 +77,42 @@ public interface AsyncTableBuilder<C extends ScanResultConsumerBase> {
|
|||
/**
|
||||
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
|
||||
* retrying.
|
||||
* @see #setRetryPauseForCQTBE(long, TimeUnit)
|
||||
* @see #setRetryPauseForServerOverloaded(long, TimeUnit)
|
||||
*/
|
||||
AsyncTableBuilder<C> setRetryPause(long pause, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
|
||||
* exponential policy to generate sleep time when retrying.
|
||||
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
|
||||
* We use an exponential policy to generate sleep time when retrying.
|
||||
* <p/>
|
||||
* This value should be greater than the normal pause value which could be set with the above
|
||||
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
|
||||
* means the server is overloaded. We just use the normal pause value for
|
||||
* {@code CallQueueTooBigException} if here you specify a smaller value.
|
||||
* {@link #setRetryPause(long, TimeUnit)} method, as usually
|
||||
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
|
||||
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
|
||||
* specify a smaller value.
|
||||
*
|
||||
* @see #setRetryPause(long, TimeUnit)
|
||||
* @deprecated Since 2.5.0, will be removed in 4.0.0. Please use
|
||||
* {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default AsyncTableBuilder<C> setRetryPauseForCQTBE(long pause, TimeUnit unit) {
|
||||
return setRetryPauseForServerOverloaded(pause, unit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
|
||||
* We use an exponential policy to generate sleep time when retrying.
|
||||
* <p/>
|
||||
* This value should be greater than the normal pause value which could be set with the above
|
||||
* {@link #setRetryPause(long, TimeUnit)} method, as usually
|
||||
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
|
||||
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
|
||||
* specify a smaller value.
|
||||
*
|
||||
* @see #setRetryPause(long, TimeUnit)
|
||||
*/
|
||||
AsyncTableBuilder<C> setRetryPauseForCQTBE(long pause, TimeUnit unit);
|
||||
AsyncTableBuilder<C> setRetryPauseForServerOverloaded(long pause, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
|
||||
|
|
|
@ -45,7 +45,7 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
|
|||
|
||||
protected long pauseNs;
|
||||
|
||||
protected long pauseForCQTBENs;
|
||||
protected long pauseNsForServerOverloaded;
|
||||
|
||||
protected int maxAttempts;
|
||||
|
||||
|
@ -60,7 +60,7 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
|
|||
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
|
||||
this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
|
||||
this.pauseNs = connConf.getPauseNs();
|
||||
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
|
||||
this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();
|
||||
this.maxAttempts = retries2Attempts(connConf.getMaxRetries());
|
||||
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
|
||||
}
|
||||
|
@ -102,8 +102,8 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
|
|||
}
|
||||
|
||||
@Override
|
||||
public AsyncTableBuilderBase<C> setRetryPauseForCQTBE(long pause, TimeUnit unit) {
|
||||
this.pauseForCQTBENs = unit.toNanos(pause);
|
||||
public AsyncTableBuilderBase<C> setRetryPauseForServerOverloaded(long pause, TimeUnit unit) {
|
||||
this.pauseNsForServerOverloaded = unit.toNanos(pause);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -11,9 +11,13 @@
|
|||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Configuration parameters for the connection.
|
||||
|
@ -25,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ConnectionConfiguration {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ConnectionConfiguration.class);
|
||||
|
||||
public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
|
||||
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
|
||||
|
@ -43,6 +48,24 @@ public class ConnectionConfiguration {
|
|||
"hbase.client.replicaCallTimeout.scan";
|
||||
public static final int PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT = 1000000; // 1s
|
||||
|
||||
/**
|
||||
* Parameter name for client pause when server is overloaded, denoted by an exception
|
||||
* where {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded(Throwable)}
|
||||
* is true.
|
||||
*/
|
||||
public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED =
|
||||
"hbase.client.pause.server.overloaded";
|
||||
|
||||
static {
|
||||
// This is added where the configs are referenced. It may be too late to happen before
|
||||
// any user _sets_ the old cqtbe config onto a Configuration option. So we still need
|
||||
// to handle checking both properties in parsing below. The benefit of calling this is
|
||||
// that it should still cause Configuration to log a warning if we do end up falling
|
||||
// through to the old deprecated config.
|
||||
Configuration.addDeprecation(
|
||||
HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED);
|
||||
}
|
||||
|
||||
private final long writeBufferSize;
|
||||
private final long writeBufferPeriodicFlushTimeoutMs;
|
||||
private final long writeBufferPeriodicFlushTimerTickMs;
|
||||
|
@ -60,6 +83,8 @@ public class ConnectionConfiguration {
|
|||
private final int writeRpcTimeout;
|
||||
// toggle for async/sync prefetch
|
||||
private final boolean clientScannerAsyncPrefetch;
|
||||
private final long pauseMs;
|
||||
private final long pauseMsForServerOverloaded;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
@ -115,6 +140,21 @@ public class ConnectionConfiguration {
|
|||
|
||||
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
|
||||
|
||||
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
|
||||
conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
|
||||
if (pauseMsForServerOverloaded < pauseMs) {
|
||||
LOG.warn(
|
||||
"The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
|
||||
HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded,
|
||||
HBASE_CLIENT_PAUSE, pauseMs);
|
||||
pauseMsForServerOverloaded = pauseMs;
|
||||
}
|
||||
|
||||
this.pauseMs = pauseMs;
|
||||
this.pauseMsForServerOverloaded = pauseMsForServerOverloaded;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -140,6 +180,8 @@ public class ConnectionConfiguration {
|
|||
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
|
||||
this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
|
||||
}
|
||||
|
||||
public int getReadRpcTimeout() {
|
||||
|
@ -205,4 +247,12 @@ public class ConnectionConfiguration {
|
|||
public int getRpcTimeout() {
|
||||
return rpcTimeout;
|
||||
}
|
||||
|
||||
public long getPauseMillis() {
|
||||
return pauseMs;
|
||||
}
|
||||
|
||||
public long getPauseMillisForServerOverloaded() {
|
||||
return pauseMsForServerOverloaded;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,9 +54,9 @@ import java.util.function.Supplier;
|
|||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.AuthUtil;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.ChoreService;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
|
@ -176,8 +176,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
|
||||
|
||||
private final long pause;
|
||||
private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
|
||||
// The mode tells if HedgedRead, LoadBalance mode is supported.
|
||||
// The default mode is CatalogReplicaMode.None.
|
||||
private CatalogReplicaMode metaReplicaMode;
|
||||
|
@ -275,16 +273,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
this.batchPool = (ThreadPoolExecutor) pool;
|
||||
this.connectionConfig = new ConnectionConfiguration(conf);
|
||||
this.closed = false;
|
||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
|
||||
if (configuredPauseForCQTBE < pause) {
|
||||
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
|
||||
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
|
||||
+ ", will use " + pause + " instead.");
|
||||
this.pauseForCQTBE = pause;
|
||||
} else {
|
||||
this.pauseForCQTBE = configuredPauseForCQTBE;
|
||||
}
|
||||
this.metaReplicaCallTimeoutScanInMicroSecond =
|
||||
connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
|
||||
|
||||
|
@ -954,7 +942,7 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
metaCache.clearCache(tableName, row, replicaId);
|
||||
}
|
||||
// Query the meta region
|
||||
long pauseBase = this.pause;
|
||||
long pauseBase = connectionConfig.getPauseMillis();
|
||||
takeUserRegionLock();
|
||||
try {
|
||||
// We don't need to check if useCache is enabled or not. Even if useCache is false
|
||||
|
@ -1044,9 +1032,10 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
if (e instanceof RemoteException) {
|
||||
e = ((RemoteException) e).unwrapRemoteException();
|
||||
}
|
||||
if (e instanceof CallQueueTooBigException) {
|
||||
// Give a special check on CallQueueTooBigException, see #HBASE-17114
|
||||
pauseBase = this.pauseForCQTBE;
|
||||
if (HBaseServerException.isServerOverloaded(e)) {
|
||||
// Give a special pause when encountering an exception indicating the server
|
||||
// is overloaded. see #HBASE-17114 and HBASE-26807
|
||||
pauseBase = connectionConfig.getPauseMillisForServerOverloaded();
|
||||
}
|
||||
if (tries < maxAttempts - 1) {
|
||||
LOG.debug(
|
||||
|
|
|
@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -77,7 +76,7 @@ public class HTableMultiplexer {
|
|||
private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private final Configuration workerConf;
|
||||
private final Configuration conf;
|
||||
private final ClusterConnection conn;
|
||||
private final ExecutorService pool;
|
||||
private final int maxAttempts;
|
||||
|
@ -116,11 +115,7 @@ public class HTableMultiplexer {
|
|||
this.executor =
|
||||
Executors.newScheduledThreadPool(initThreads,
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
|
||||
|
||||
this.workerConf = HBaseConfiguration.create(conf);
|
||||
// We do not do the retry because we need to reassign puts to different queues if regions are
|
||||
// moved.
|
||||
this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -245,7 +240,7 @@ public class HTableMultiplexer {
|
|||
worker = serverToFlushWorkerMap.get(addr);
|
||||
if (worker == null) {
|
||||
// Create the flush worker
|
||||
worker = new FlushWorker(workerConf, this.conn, addr, this,
|
||||
worker = new FlushWorker(conf, this.conn, addr, this,
|
||||
perRegionServerBufferQueueSize, pool, executor);
|
||||
this.serverToFlushWorkerMap.put(addr, worker);
|
||||
executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
|
||||
|
@ -454,7 +449,9 @@ public class HTableMultiplexer {
|
|||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory);
|
||||
// Specify 0 retries in AsyncProcess because we need to reassign puts to different queues
|
||||
// if regions are moved.
|
||||
this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory, 0);
|
||||
this.executor = executor;
|
||||
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
|
||||
this.pool = pool;
|
||||
|
|
|
@ -333,7 +333,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
|
||||
private final long pauseNs;
|
||||
|
||||
private final long pauseForCQTBENs;
|
||||
private final long pauseNsForServerOverloaded;
|
||||
|
||||
private final int maxAttempts;
|
||||
|
||||
|
@ -349,15 +349,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
this.rpcTimeoutNs = builder.rpcTimeoutNs;
|
||||
this.operationTimeoutNs = builder.operationTimeoutNs;
|
||||
this.pauseNs = builder.pauseNs;
|
||||
if (builder.pauseForCQTBENs < builder.pauseNs) {
|
||||
if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
|
||||
LOG.warn(
|
||||
"Configured value of pauseForCQTBENs is {} ms, which is less than" +
|
||||
"Configured value of pauseNsForServerOverloaded is {} ms, which is less than" +
|
||||
" the normal pause value {} ms, use the greater one instead",
|
||||
TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
|
||||
TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
|
||||
TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
|
||||
this.pauseForCQTBENs = builder.pauseNs;
|
||||
this.pauseNsForServerOverloaded = builder.pauseNs;
|
||||
} else {
|
||||
this.pauseForCQTBENs = builder.pauseForCQTBENs;
|
||||
this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
|
||||
}
|
||||
this.maxAttempts = builder.maxAttempts;
|
||||
this.startLogErrorsCnt = builder.startLogErrorsCnt;
|
||||
|
@ -368,7 +368,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return this.connection.callerFactory.<T> masterRequest()
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS)
|
||||
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
|
||||
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
|
||||
}
|
||||
|
||||
|
@ -376,7 +377,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return this.connection.callerFactory.<T> adminRequest()
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS)
|
||||
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
|
||||
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
|
||||
}
|
||||
|
||||
|
@ -3486,7 +3488,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return this.connection.callerFactory.<T> serverRequest()
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS)
|
||||
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
|
||||
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMu
|
|||
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
|
||||
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
|
@ -59,11 +58,9 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timer;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||
|
@ -112,7 +109,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
|
||||
private final long pauseNs;
|
||||
|
||||
private final long pauseForCQTBENs;
|
||||
private final long pauseNsForServerOverloaded;
|
||||
|
||||
private final int maxAttempts;
|
||||
|
||||
|
@ -128,15 +125,15 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
this.operationTimeoutNs = builder.operationTimeoutNs;
|
||||
this.scanTimeoutNs = builder.scanTimeoutNs;
|
||||
this.pauseNs = builder.pauseNs;
|
||||
if (builder.pauseForCQTBENs < builder.pauseNs) {
|
||||
if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
|
||||
LOG.warn(
|
||||
"Configured value of pauseForCQTBENs is {} ms, which is less than"
|
||||
+ " the normal pause value {} ms, use the greater one instead",
|
||||
TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
|
||||
"Configured value of pauseNsForServerOverloaded is {} ms, which is less than" +
|
||||
" the normal pause value {} ms, use the greater one instead",
|
||||
TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
|
||||
TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
|
||||
this.pauseForCQTBENs = builder.pauseNs;
|
||||
this.pauseNsForServerOverloaded = builder.pauseNs;
|
||||
} else {
|
||||
this.pauseForCQTBENs = builder.pauseForCQTBENs;
|
||||
this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
|
||||
}
|
||||
this.maxAttempts = builder.maxAttempts;
|
||||
this.startLogErrorsCnt = builder.startLogErrorsCnt;
|
||||
|
@ -242,10 +239,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
}
|
||||
|
||||
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
|
||||
return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
|
||||
return conn.callerFactory.<T>single().table(tableName).row(row).priority(priority)
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS)
|
||||
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
|
||||
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
|
||||
}
|
||||
|
||||
|
@ -648,8 +646,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
@Override
|
||||
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
|
||||
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
|
||||
pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt)
|
||||
.start();
|
||||
pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs,
|
||||
startLogErrorsCnt).start();
|
||||
}
|
||||
|
||||
private long resultSize2CacheSize(long maxResultSize) {
|
||||
|
@ -744,7 +742,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
return conn.callerFactory.batch().table(tableName).actions(actions)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
|
||||
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
|
||||
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
|
||||
.startLogErrorsCnt(startLogErrorsCnt).call();
|
||||
}
|
||||
|
||||
|
|
|
@ -21,8 +21,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Factory to create an {@link RpcRetryingCaller}
|
||||
|
@ -32,12 +30,8 @@ public class RpcRetryingCallerFactory {
|
|||
|
||||
/** Configuration key for a custom {@link RpcRetryingCaller} */
|
||||
public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RpcRetryingCallerFactory.class);
|
||||
protected final Configuration conf;
|
||||
private final long pause;
|
||||
private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
|
||||
private final int retries;
|
||||
private final int rpcTimeout;
|
||||
private final ConnectionConfiguration connectionConf;
|
||||
private final RetryingCallerInterceptor interceptor;
|
||||
private final int startLogErrorsCnt;
|
||||
/* These below data members are UNUSED!!!*/
|
||||
|
@ -50,25 +44,12 @@ public class RpcRetryingCallerFactory {
|
|||
|
||||
public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
|
||||
this.conf = conf;
|
||||
pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
|
||||
if (configuredPauseForCQTBE < pause) {
|
||||
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
|
||||
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
|
||||
+ ", will use " + pause + " instead.");
|
||||
this.pauseForCQTBE = pause;
|
||||
} else {
|
||||
this.pauseForCQTBE = configuredPauseForCQTBE;
|
||||
}
|
||||
retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||
this.connectionConf = new ConnectionConfiguration(conf);
|
||||
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
|
||||
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||
this.interceptor = interceptor;
|
||||
enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
|
||||
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
|
||||
rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -84,9 +65,11 @@ public class RpcRetryingCallerFactory {
|
|||
public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
|
||||
// We store the values in the factory instance. This way, constructing new objects
|
||||
// is cheap as it does not require parsing a complex structure.
|
||||
RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries,
|
||||
return new RpcRetryingCallerImpl<>(
|
||||
connectionConf.getPauseMillis(),
|
||||
connectionConf.getPauseMillisForServerOverloaded(),
|
||||
connectionConf.getRetriesNumber(),
|
||||
interceptor, startLogErrorsCnt, rpcTimeout);
|
||||
return caller;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,9 +78,12 @@ public class RpcRetryingCallerFactory {
|
|||
public <T> RpcRetryingCaller<T> newCaller() {
|
||||
// We store the values in the factory instance. This way, constructing new objects
|
||||
// is cheap as it does not require parsing a complex structure.
|
||||
RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries,
|
||||
interceptor, startLogErrorsCnt, rpcTimeout);
|
||||
return caller;
|
||||
return new RpcRetryingCallerImpl<>(
|
||||
connectionConf.getPauseMillis(),
|
||||
connectionConf.getPauseMillisForServerOverloaded(),
|
||||
connectionConf.getRetriesNumber(),
|
||||
interceptor, startLogErrorsCnt,
|
||||
connectionConf.getRpcTimeout());
|
||||
}
|
||||
|
||||
public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
|
||||
|
|
|
@ -30,8 +30,8 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
||||
|
@ -60,7 +60,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
|||
private final int startLogErrorsCnt;
|
||||
|
||||
private final long pause;
|
||||
private final long pauseForCQTBE;
|
||||
private final long pauseForServerOverloaded;
|
||||
private final int maxAttempts;// how many times to try
|
||||
private final int rpcTimeout;// timeout for each rpc request
|
||||
private final AtomicBoolean cancelled = new AtomicBoolean(false);
|
||||
|
@ -68,15 +68,16 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
|||
private final RetryingCallerInterceptorContext context;
|
||||
private final RetryingTimeTracker tracker;
|
||||
|
||||
public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, int startLogErrorsCnt) {
|
||||
this(pause, pauseForCQTBE, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR,
|
||||
startLogErrorsCnt, 0);
|
||||
public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
|
||||
int startLogErrorsCnt) {
|
||||
this(pause, pauseForServerOverloaded, retries,
|
||||
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
|
||||
}
|
||||
|
||||
public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries,
|
||||
public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
|
||||
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
|
||||
this.pause = pause;
|
||||
this.pauseForCQTBE = pauseForCQTBE;
|
||||
this.pauseForServerOverloaded = pauseForServerOverloaded;
|
||||
this.maxAttempts = retries2Attempts(retries);
|
||||
this.interceptor = interceptor;
|
||||
context = interceptor.createEmptyContext();
|
||||
|
@ -148,8 +149,10 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
|
|||
// If the server is dead, we need to wait a little before retrying, to give
|
||||
// a chance to the regions to be moved
|
||||
// get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
|
||||
// special when encountering CallQueueTooBigException, see #HBASE-17114
|
||||
long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause;
|
||||
// special when encountering an exception indicating the server is overloaded.
|
||||
// see #HBASE-17114 and HBASE-26807
|
||||
long pauseBase = HBaseServerException.isServerOverloaded(t)
|
||||
? pauseForServerOverloaded : pause;
|
||||
expectedSleep = callable.sleep(pauseBase, tries);
|
||||
|
||||
// If, after the planned sleep, there won't be enough time left, we stop now.
|
||||
|
|
|
@ -111,28 +111,6 @@ public final class ClientExceptionsUtil {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the exception is CallQueueTooBig exception (maybe wrapped
|
||||
* into some RemoteException).
|
||||
* @param t exception to check
|
||||
* @return true if it's a CQTBE, false otherwise
|
||||
*/
|
||||
public static boolean isCallQueueTooBigException(Throwable t) {
|
||||
t = findException(t);
|
||||
return (t instanceof CallQueueTooBigException);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the exception is CallDroppedException (maybe wrapped
|
||||
* into some RemoteException).
|
||||
* @param t exception to check
|
||||
* @return true if it's a CQTBE, false otherwise
|
||||
*/
|
||||
public static boolean isCallDroppedException(Throwable t) {
|
||||
t = findException(t);
|
||||
return (t instanceof CallDroppedException);
|
||||
}
|
||||
|
||||
// This list covers most connectivity exceptions but not all.
|
||||
// For example, in SocketOutputStream a plain IOException is thrown at times when the channel is
|
||||
// closed.
|
||||
|
|
|
@ -40,14 +40,12 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||
|
@ -140,11 +138,13 @@ class IPCUtil {
|
|||
static RemoteException createRemoteException(final ExceptionResponse e) {
|
||||
String innerExceptionClassName = e.getExceptionClassName();
|
||||
boolean doNotRetry = e.getDoNotRetry();
|
||||
boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded();
|
||||
return e.hasHostname() ?
|
||||
// If a hostname then add it to the RemoteWithExtrasException
|
||||
new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
|
||||
e.getPort(), doNotRetry)
|
||||
: new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
|
||||
e.getPort(), doNotRetry, serverOverloaded) :
|
||||
new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry,
|
||||
serverOverloaded);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.security.PrivilegedAction;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.util.DynamicClassLoader;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -41,6 +42,7 @@ public class RemoteWithExtrasException extends RemoteException {
|
|||
private final String hostname;
|
||||
private final int port;
|
||||
private final boolean doNotRetry;
|
||||
private final boolean serverOverloaded;
|
||||
|
||||
/**
|
||||
* Dynamic class loader to load filter/comparators
|
||||
|
@ -58,15 +60,26 @@ public class RemoteWithExtrasException extends RemoteException {
|
|||
}
|
||||
|
||||
public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry) {
|
||||
this(className, msg, null, -1, doNotRetry);
|
||||
this(className, msg, doNotRetry, false);
|
||||
}
|
||||
|
||||
public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry,
|
||||
final boolean serverOverloaded) {
|
||||
this(className, msg, null, -1, doNotRetry, serverOverloaded);
|
||||
}
|
||||
|
||||
public RemoteWithExtrasException(String className, String msg, final String hostname,
|
||||
final int port, final boolean doNotRetry) {
|
||||
this(className, msg, hostname, port, doNotRetry, false);
|
||||
}
|
||||
|
||||
public RemoteWithExtrasException(String className, String msg, final String hostname,
|
||||
final int port, final boolean doNotRetry, final boolean serverOverloaded) {
|
||||
super(className, msg);
|
||||
this.hostname = hostname;
|
||||
this.port = port;
|
||||
this.doNotRetry = doNotRetry;
|
||||
this.serverOverloaded = serverOverloaded;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -98,6 +111,17 @@ public class RemoteWithExtrasException extends RemoteException {
|
|||
cn.setAccessible(true);
|
||||
IOException ex = cn.newInstance(this.getMessage());
|
||||
ex.initCause(this);
|
||||
|
||||
if (ex instanceof HBaseServerException) {
|
||||
// this is a newly constructed exception.
|
||||
// if an exception defaults to meaning isServerOverloaded, we use that.
|
||||
// otherwise, see if the remote exception value should mean setting to true.
|
||||
HBaseServerException serverException = (HBaseServerException) ex;
|
||||
if (serverOverloaded && !serverException.isServerOverloaded()) {
|
||||
serverException.setServerOverloaded(true);
|
||||
}
|
||||
}
|
||||
|
||||
return ex;
|
||||
}
|
||||
|
||||
|
@ -121,4 +145,11 @@ public class RemoteWithExtrasException extends RemoteException {
|
|||
public boolean isDoNotRetry() {
|
||||
return this.doNotRetry;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if the server was considered overloaded when the exception was thrown.
|
||||
*/
|
||||
public boolean isServerOverloaded() {
|
||||
return serverOverloaded;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -40,6 +40,23 @@ public class TestAsyncConnectionConfiguration {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncConnectionConfiguration.class);
|
||||
|
||||
@Test
|
||||
public void itHandlesDeprecatedPauseForCQTBE() {
|
||||
Configuration conf = new Configuration();
|
||||
long timeoutMs = 1000;
|
||||
conf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, timeoutMs);
|
||||
AsyncConnectionConfiguration config = new AsyncConnectionConfiguration(conf);
|
||||
|
||||
assertTrue(Configuration.isDeprecated(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE));
|
||||
long expected = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
|
||||
assertEquals(expected, config.getPauseNsForServerOverloaded());
|
||||
|
||||
conf = new Configuration();
|
||||
conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, timeoutMs);
|
||||
config = new AsyncConnectionConfiguration(conf);
|
||||
assertEquals(expected, config.getPauseNsForServerOverloaded());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultReadWriteRpcTimeout() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
|
|
@ -48,9 +48,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CallDroppedException;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
@ -1038,7 +1040,12 @@ public class TestAsyncProcess {
|
|||
}
|
||||
|
||||
private ClusterConnection createHConnection() throws IOException {
|
||||
ClusterConnection hc = createHConnectionCommon();
|
||||
return createHConnection(CONNECTION_CONFIG);
|
||||
}
|
||||
|
||||
private ClusterConnection createHConnection(ConnectionConfiguration configuration)
|
||||
throws IOException {
|
||||
ClusterConnection hc = createHConnectionCommon(configuration);
|
||||
setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
|
||||
setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
|
||||
setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
|
||||
|
@ -1048,8 +1055,9 @@ public class TestAsyncProcess {
|
|||
return hc;
|
||||
}
|
||||
|
||||
private ClusterConnection createHConnectionWithReplicas() throws IOException {
|
||||
ClusterConnection hc = createHConnectionCommon();
|
||||
private ClusterConnection createHConnectionWithReplicas(ConnectionConfiguration configuration)
|
||||
throws IOException {
|
||||
ClusterConnection hc = createHConnectionCommon(configuration);
|
||||
setMockLocation(hc, DUMMY_BYTES_1, hrls1);
|
||||
setMockLocation(hc, DUMMY_BYTES_2, hrls2);
|
||||
setMockLocation(hc, DUMMY_BYTES_3, hrls3);
|
||||
|
@ -1076,13 +1084,14 @@ public class TestAsyncProcess {
|
|||
Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result);
|
||||
}
|
||||
|
||||
private ClusterConnection createHConnectionCommon() {
|
||||
private ClusterConnection createHConnectionCommon(
|
||||
ConnectionConfiguration connectionConfiguration) {
|
||||
ClusterConnection hc = Mockito.mock(ClusterConnection.class);
|
||||
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
|
||||
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
|
||||
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
|
||||
Mockito.when(hc.getConfiguration()).thenReturn(CONF);
|
||||
Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
|
||||
Mockito.when(hc.getConnectionConfiguration()).thenReturn(connectionConfiguration);
|
||||
return hc;
|
||||
}
|
||||
|
||||
|
@ -1608,11 +1617,11 @@ public class TestAsyncProcess {
|
|||
// TODO: this is kind of timing dependent... perhaps it should detect from createCaller
|
||||
// that the replica call has happened and that way control the ordering.
|
||||
Configuration conf = new Configuration();
|
||||
ClusterConnection conn = createHConnectionWithReplicas();
|
||||
conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
|
||||
if (retries >= 0) {
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
|
||||
}
|
||||
ClusterConnection conn = createHConnectionWithReplicas(new ConnectionConfiguration(conf));
|
||||
MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
|
||||
ap.setCallDelays(primaryMs, replicaMs);
|
||||
return ap;
|
||||
|
@ -1736,20 +1745,31 @@ public class TestAsyncProcess {
|
|||
}
|
||||
|
||||
/**
|
||||
* Test and make sure we could use a special pause setting when retry with
|
||||
* CallQueueTooBigException, see HBASE-17114
|
||||
* @throws Exception if unexpected error happened during test
|
||||
* Below tests make sure we could use a special pause setting when retry an exception
|
||||
* where {@link HBaseServerException#isServerOverloaded(Throwable)} is true, see HBASE-17114
|
||||
*/
|
||||
|
||||
@Test
|
||||
public void testRetryPauseWithCallQueueTooBigException() throws Exception {
|
||||
Configuration myConf = new Configuration(CONF);
|
||||
public void testRetryPauseWhenServerOverloadedDueToCQTBE() throws Exception {
|
||||
testRetryPauseWhenServerIsOverloaded(new CallQueueTooBigException());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryPauseWhenServerOverloadedDueToCDE() throws Exception {
|
||||
testRetryPauseWhenServerIsOverloaded(new CallDroppedException());
|
||||
}
|
||||
|
||||
private void testRetryPauseWhenServerIsOverloaded(
|
||||
HBaseServerException exception) throws IOException {
|
||||
Configuration conf = new Configuration(CONF);
|
||||
final long specialPause = 500L;
|
||||
final int retries = 1;
|
||||
myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
|
||||
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
|
||||
ClusterConnection conn = new MyConnectionImpl(myConf);
|
||||
conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, specialPause);
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
|
||||
|
||||
ClusterConnection conn = new MyConnectionImpl(conf);
|
||||
AsyncProcessWithFailure ap =
|
||||
new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
|
||||
new AsyncProcessWithFailure(conn, conf, exception);
|
||||
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
|
||||
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
|
||||
|
||||
|
@ -1779,8 +1799,8 @@ public class TestAsyncProcess {
|
|||
|
||||
// check and confirm normal IOE will use the normal pause
|
||||
final long normalPause =
|
||||
myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
ap = new AsyncProcessWithFailure(conn, myConf, new IOException());
|
||||
conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
ap = new AsyncProcessWithFailure(conn, conf, new IOException());
|
||||
bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
|
||||
mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
|
||||
Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
|
||||
|
@ -1806,9 +1826,9 @@ public class TestAsyncProcess {
|
|||
|
||||
@Test
|
||||
public void testRetryWithExceptionClearsMetaCache() throws Exception {
|
||||
ClusterConnection conn = createHConnection();
|
||||
Configuration myConf = conn.getConfiguration();
|
||||
Configuration myConf = new Configuration(CONF);
|
||||
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
|
||||
ClusterConnection conn = createHConnection(new ConnectionConfiguration(myConf));
|
||||
|
||||
AsyncProcessWithFailure ap =
|
||||
new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test"));
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ ClientTests.class, SmallTests.class})
|
||||
public class TestConnectionConfiguration {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestConnectionConfiguration.class);
|
||||
|
||||
@Test
|
||||
public void itHandlesDeprecatedPauseForCQTBE() {
|
||||
Configuration conf = new Configuration();
|
||||
long timeoutMs = 1000;
|
||||
conf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, timeoutMs);
|
||||
ConnectionConfiguration config = new ConnectionConfiguration(conf);
|
||||
|
||||
assertTrue(Configuration.isDeprecated(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE));
|
||||
assertEquals(timeoutMs, config.getPauseMillisForServerOverloaded());
|
||||
|
||||
conf = new Configuration();
|
||||
conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, timeoutMs);
|
||||
config = new ConnectionConfiguration(conf);
|
||||
assertEquals(timeoutMs, config.getPauseMillisForServerOverloaded());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.CallDroppedException;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ ClientTests.class, SmallTests.class})
|
||||
public class TestRpcRetryingCallerImpl {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRpcRetryingCallerImpl.class);
|
||||
|
||||
@Test
|
||||
public void itUsesSpecialPauseForCQTBE() throws Exception {
|
||||
itUsesSpecialPauseForServerOverloaded(CallQueueTooBigException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void itUsesSpecialPauseForCDE() throws Exception {
|
||||
itUsesSpecialPauseForServerOverloaded(CallDroppedException.class);
|
||||
}
|
||||
|
||||
private void itUsesSpecialPauseForServerOverloaded(
|
||||
Class<? extends HBaseServerException> exceptionClass) throws Exception {
|
||||
|
||||
// the actual values don't matter here as long as they're distinct.
|
||||
// the ThrowingCallable will assert that the passed in pause from RpcRetryingCallerImpl
|
||||
// matches the specialPauseMillis
|
||||
long pauseMillis = 1;
|
||||
long specialPauseMillis = 2;
|
||||
|
||||
RpcRetryingCallerImpl<Void> caller =
|
||||
new RpcRetryingCallerImpl<>(pauseMillis, specialPauseMillis, 2, 0);
|
||||
|
||||
RetryingCallable<Void> callable = new ThrowingCallable(
|
||||
CallQueueTooBigException.class, specialPauseMillis);
|
||||
try {
|
||||
caller.callWithRetries(callable, 5000);
|
||||
fail("Expected " + exceptionClass.getSimpleName());
|
||||
} catch (RetriesExhaustedException e) {
|
||||
assertTrue(e.getCause() instanceof HBaseServerException);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ThrowingCallable implements RetryingCallable<Void> {
|
||||
private final Class<? extends HBaseServerException> exceptionClass;
|
||||
private final long specialPauseMillis;
|
||||
|
||||
public ThrowingCallable(Class<? extends HBaseServerException> exceptionClass,
|
||||
long specialPauseMillis) {
|
||||
this.exceptionClass = exceptionClass;
|
||||
this.specialPauseMillis = specialPauseMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(boolean reload) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void throwable(Throwable t, boolean retrying) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExceptionMessageAdditionalDetail() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sleep(long pause, int tries) {
|
||||
assertEquals(pause, specialPauseMillis);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void call(int callTimeout) throws Exception {
|
||||
throw exceptionClass.getConstructor().newInstance();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ ClientTests.class, SmallTests.class })
|
||||
public class TestRemoteWithExtrasException {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRemoteWithExtrasException.class);
|
||||
|
||||
/**
|
||||
* test verifies that we honor the inherent value of an exception for isServerOverloaded.
|
||||
* We don't want a false value passed into RemoteWithExtrasExceptions to override the
|
||||
* inherent value of an exception if it's already true. This could be due to an out of date
|
||||
* server not sending the proto field we expect.
|
||||
*/
|
||||
@Test
|
||||
public void itUsesExceptionDefaultValueForServerOverloaded() {
|
||||
// pass false for server overloaded, we still expect the exception to be true due to
|
||||
// the exception type
|
||||
RemoteWithExtrasException ex =
|
||||
new RemoteWithExtrasException(ServerOverloadedException.class.getName(),
|
||||
"server is overloaded", false, false);
|
||||
IOException result = ex.unwrapRemoteException();
|
||||
|
||||
assertEquals(result.getClass(), ServerOverloadedException.class);
|
||||
assertTrue(((ServerOverloadedException) result).isServerOverloaded());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void itUsesPassedServerOverloadedValue() {
|
||||
String exceptionClass = HBaseServerException.class.getName();
|
||||
String message = "server is overloaded";
|
||||
RemoteWithExtrasException ex =
|
||||
new RemoteWithExtrasException(exceptionClass, message, false, false);
|
||||
IOException result = ex.unwrapRemoteException();
|
||||
|
||||
assertTrue(result instanceof HBaseServerException);
|
||||
assertFalse(((HBaseServerException) result).isServerOverloaded());
|
||||
|
||||
// run again with true value passed in
|
||||
ex = new RemoteWithExtrasException(exceptionClass, message, false, true);
|
||||
result = ex.unwrapRemoteException();
|
||||
|
||||
assertTrue(result instanceof HBaseServerException);
|
||||
// expect true this time
|
||||
assertTrue(((HBaseServerException) result).isServerOverloaded());
|
||||
}
|
||||
|
||||
private static class ServerOverloadedException extends HBaseServerException {
|
||||
public ServerOverloadedException(String message) {
|
||||
super(true, message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -831,7 +831,10 @@ public final class HConstants {
|
|||
|
||||
/**
|
||||
* Parameter name for client pause value for special case such as call queue too big, etc.
|
||||
* @deprecated Since 2.5.0, will be removed in 4.0.0. Please use
|
||||
* hbase.client.pause.server.overloaded instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static final String HBASE_CLIENT_PAUSE_FOR_CQTBE = "hbase.client.pause.cqtbe";
|
||||
|
||||
/**
|
||||
|
|
|
@ -493,12 +493,14 @@ possible configurations would overwhelm and obscure the important.
|
|||
this initial pause amount and how this pause works w/ retries.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.client.pause.cqtbe</name>
|
||||
<name>hbase.client.pause.server.overloaded</name>
|
||||
<value></value>
|
||||
<description>Whether or not to use a special client pause for
|
||||
CallQueueTooBigException (cqtbe). Set this property to a higher value
|
||||
than hbase.client.pause if you observe frequent CQTBE from the same
|
||||
RegionServer and the call queue there keeps full</description>
|
||||
<description>Pause time when encountering an exception indicating a
|
||||
server is overloaded, CallQueueTooBigException or CallDroppedException.
|
||||
Set this property to a higher value than hbase.client.pause if you
|
||||
observe frequent CallQueueTooBigException or CallDroppedException from the same
|
||||
RegionServer and the call queue there keeps filling up. This config used to be
|
||||
called hbase.client.pause.cqtbe, which has been deprecated as of 2.5.0.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.client.retries.number</name>
|
||||
|
|
|
@ -119,6 +119,8 @@ message ExceptionResponse {
|
|||
optional int32 port = 4;
|
||||
// Set if we are NOT to retry on receipt of this exception
|
||||
optional bool do_not_retry = 5;
|
||||
// Set true if the server was considered to be overloaded when exception was thrown
|
||||
optional bool server_overloaded = 6;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Optional;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
|
||||
|
@ -320,6 +321,9 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
|
|||
RegionMovedException rme = (RegionMovedException)t;
|
||||
exceptionBuilder.setHostname(rme.getHostname());
|
||||
exceptionBuilder.setPort(rme.getPort());
|
||||
} else if (t instanceof HBaseServerException) {
|
||||
HBaseServerException hse = (HBaseServerException) t;
|
||||
exceptionBuilder.setServerOverloaded(hse.isServerOverloaded());
|
||||
}
|
||||
// Set the exception as the result of the method invocation.
|
||||
headerBuilder.setException(exceptionBuilder.build());
|
||||
|
|
|
@ -111,6 +111,8 @@ public class HConnectionTestingUtility {
|
|||
throws IOException {
|
||||
ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class);
|
||||
Mockito.when(c.getConfiguration()).thenReturn(conf);
|
||||
ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration(conf);
|
||||
Mockito.when(c.getConnectionConfiguration()).thenReturn(connectionConfiguration);
|
||||
Mockito.doNothing().when(c).close();
|
||||
// Make it so we return a particular location when asked.
|
||||
final HRegionLocation loc = new HRegionLocation(hri, sn);
|
||||
|
@ -134,9 +136,10 @@ public class HConnectionTestingUtility {
|
|||
}
|
||||
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
|
||||
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
|
||||
Mockito.when(c.getAsyncProcess()).thenReturn(
|
||||
AsyncProcess asyncProcess =
|
||||
new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf),
|
||||
RpcControllerFactory.instantiate(conf)));
|
||||
RpcControllerFactory.instantiate(conf));
|
||||
Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
|
||||
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
|
||||
RpcRetryingCallerFactory.instantiate(conf,
|
||||
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -36,9 +37,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ipc.CallRunner;
|
||||
import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue;
|
||||
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
||||
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
|
||||
|
@ -56,31 +59,46 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
|||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestAsyncClientPauseForCallQueueTooBig {
|
||||
public class TestAsyncClientPauseForServerOverloaded {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class);
|
||||
HBaseClassTestRule.forClass(TestAsyncClientPauseForServerOverloaded.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("CQTBE");
|
||||
private static TableName TABLE_NAME = TableName.valueOf("ServerOverloaded");
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("Family");
|
||||
|
||||
private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
|
||||
|
||||
private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1);
|
||||
private static long PAUSE_FOR_SERVER_OVERLOADED_NANOS = TimeUnit.SECONDS.toNanos(1);
|
||||
private static long PAUSE_FOR_SERVER_OVERLOADED_MILLIS = TimeUnit.NANOSECONDS.toMillis(
|
||||
PAUSE_FOR_SERVER_OVERLOADED_NANOS);
|
||||
|
||||
private static AsyncConnection CONN;
|
||||
|
||||
private static boolean FAIL = false;
|
||||
private static volatile FailMode MODE = null;
|
||||
|
||||
private static ConcurrentMap<MethodDescriptor, AtomicInteger> INVOKED = new ConcurrentHashMap<>();
|
||||
enum FailMode {
|
||||
CALL_QUEUE_TOO_BIG,
|
||||
CALL_DROPPED;
|
||||
|
||||
public static final class CQTBERpcScheduler extends SimpleRpcScheduler {
|
||||
private ConcurrentMap<MethodDescriptor, AtomicInteger> invoked = new ConcurrentHashMap<>();
|
||||
|
||||
public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
|
||||
// this is for test scan, where we will send a open scanner first and then a next, and we
|
||||
// expect that we hit CQTBE two times.
|
||||
private boolean shouldFail(CallRunner callRunner) {
|
||||
MethodDescriptor method = callRunner.getRpcCall().getMethod();
|
||||
return invoked.computeIfAbsent(method,
|
||||
k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class OverloadedRpcScheduler extends SimpleRpcScheduler {
|
||||
|
||||
public OverloadedRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
|
||||
int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
|
||||
Abortable server, int highPriorityLevel) {
|
||||
super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
|
||||
|
@ -89,25 +107,36 @@ public class TestAsyncClientPauseForCallQueueTooBig {
|
|||
|
||||
@Override
|
||||
public boolean dispatch(CallRunner callTask) {
|
||||
if (FAIL) {
|
||||
MethodDescriptor method = callTask.getRpcCall().getMethod();
|
||||
// this is for test scan, where we will send a open scanner first and then a next, and we
|
||||
// expect that we hit CQTBE two times.
|
||||
if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) {
|
||||
if (MODE == FailMode.CALL_QUEUE_TOO_BIG && MODE.shouldFail(callTask)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return super.dispatch(callTask);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory {
|
||||
public static final class OverloadedQueue extends TestPluggableQueueImpl {
|
||||
|
||||
public OverloadedQueue(int maxQueueLength, PriorityFunction priority, Configuration conf) {
|
||||
super(maxQueueLength, priority, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean offer(CallRunner callRunner) {
|
||||
if (MODE == FailMode.CALL_DROPPED && MODE.shouldFail(callRunner)) {
|
||||
callRunner.drop();
|
||||
return true;
|
||||
}
|
||||
return super.offer(callRunner);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class OverloadedRpcSchedulerFactory extends SimpleRpcSchedulerFactory {
|
||||
|
||||
@Override
|
||||
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
|
||||
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
||||
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
|
||||
return new CQTBERpcScheduler(conf, handlerCount,
|
||||
return new OverloadedRpcScheduler(conf, handlerCount,
|
||||
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
|
||||
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
|
||||
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
|
||||
|
@ -122,12 +151,16 @@ public class TestAsyncClientPauseForCallQueueTooBig {
|
|||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
|
||||
UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
|
||||
TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS));
|
||||
UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable");
|
||||
UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name",
|
||||
OverloadedQueue.class, PluggableBlockingQueue.class);
|
||||
UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
|
||||
CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class);
|
||||
OverloadedRpcSchedulerFactory.class, RpcSchedulerFactory.class);
|
||||
UTIL.startMiniCluster(1);
|
||||
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
|
||||
|
||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||
conf.setLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, PAUSE_FOR_SERVER_OVERLOADED_MILLIS);
|
||||
CONN = ConnectionFactory.createAsyncConnection(conf).get();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -143,23 +176,29 @@ public class TestAsyncClientPauseForCallQueueTooBig {
|
|||
table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
|
||||
}
|
||||
}
|
||||
FAIL = true;
|
||||
MODE = FailMode.CALL_QUEUE_TOO_BIG;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownAfterTest() throws IOException {
|
||||
FAIL = false;
|
||||
INVOKED.clear();
|
||||
for (FailMode mode : FailMode.values()) {
|
||||
mode.invoked.clear();
|
||||
}
|
||||
MODE = null;
|
||||
UTIL.getAdmin().disableTable(TABLE_NAME);
|
||||
UTIL.getAdmin().deleteTable(TABLE_NAME);
|
||||
}
|
||||
|
||||
private void assertTime(Callable<Void> callable, long time) throws Exception {
|
||||
for (FailMode mode : FailMode.values()) {
|
||||
MODE = mode;
|
||||
|
||||
long startNs = System.nanoTime();
|
||||
callable.call();
|
||||
long costNs = System.nanoTime() - startNs;
|
||||
assertTrue(costNs > time);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGet() throws Exception {
|
||||
|
@ -167,7 +206,7 @@ public class TestAsyncClientPauseForCallQueueTooBig {
|
|||
Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get();
|
||||
assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER));
|
||||
return null;
|
||||
}, PAUSE_FOR_CQTBE_NS);
|
||||
}, PAUSE_FOR_SERVER_OVERLOADED_NANOS);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -181,7 +220,7 @@ public class TestAsyncClientPauseForCallQueueTooBig {
|
|||
}
|
||||
}
|
||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
|
||||
}, PAUSE_FOR_CQTBE_NS);
|
||||
}, PAUSE_FOR_SERVER_OVERLOADED_NANOS);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -197,6 +236,6 @@ public class TestAsyncClientPauseForCallQueueTooBig {
|
|||
assertNull(scanner.next());
|
||||
}
|
||||
return null;
|
||||
}, PAUSE_FOR_CQTBE_NS * 2);
|
||||
}, PAUSE_FOR_SERVER_OVERLOADED_NANOS * 2);
|
||||
}
|
||||
}
|
|
@ -22,9 +22,9 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -41,8 +41,11 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CallDroppedException;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseServerException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
@ -79,7 +82,6 @@ import org.junit.experimental.categories.Category;
|
|||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
|
||||
|
@ -1086,6 +1088,100 @@ public class TestConnectionImplementation {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocateRegionsRetrySpecialPauseCQTBE() throws IOException {
|
||||
testLocateRegionsRetrySpecialPause(CallQueueTooBigException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocateRegionsRetrySpecialPauseCDE() throws IOException {
|
||||
testLocateRegionsRetrySpecialPause(CallDroppedException.class);
|
||||
}
|
||||
|
||||
private void testLocateRegionsRetrySpecialPause(
|
||||
Class<? extends HBaseServerException> exceptionClass) throws IOException {
|
||||
|
||||
int regionReplication = 3;
|
||||
byte[] family = Bytes.toBytes("cf");
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
|
||||
// Create a table with region replicas
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder
|
||||
.newBuilder(tableName)
|
||||
.setRegionReplication(regionReplication)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
|
||||
TEST_UTIL.getAdmin().createTable(builder.build());
|
||||
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
|
||||
conf.setClass(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
|
||||
ThrowingCallerFactory.class, RpcRetryingCallerFactory.class);
|
||||
conf.setClass("testSpecialPauseException", exceptionClass, HBaseServerException.class);
|
||||
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
// normal pause very short, 10 millis
|
||||
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10);
|
||||
|
||||
// special pause 10x longer, so we can detect it
|
||||
long specialPauseMillis = 1000;
|
||||
conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
|
||||
specialPauseMillis);
|
||||
|
||||
try (ConnectionImplementation con =
|
||||
(ConnectionImplementation) ConnectionFactory.createConnection(conf)) {
|
||||
// Get locations of the regions of the table
|
||||
|
||||
long start = System.nanoTime();
|
||||
try {
|
||||
con.locateRegion(tableName, new byte[0], false, true, 0);
|
||||
} catch (HBaseServerException e) {
|
||||
assertTrue(e.isServerOverloaded());
|
||||
// pass: expected
|
||||
}
|
||||
assertTrue(System.nanoTime() - start > TimeUnit.MILLISECONDS.toNanos(specialPauseMillis));
|
||||
} finally {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ThrowingCallerFactory extends RpcRetryingCallerFactory {
|
||||
|
||||
private final Class<? extends HBaseServerException> exceptionClass;
|
||||
|
||||
public ThrowingCallerFactory(Configuration conf) {
|
||||
super(conf);
|
||||
this.exceptionClass = conf.getClass("testSpecialPauseException",
|
||||
null, HBaseServerException.class);
|
||||
}
|
||||
|
||||
@Override public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
|
||||
return newCaller();
|
||||
}
|
||||
|
||||
@Override public <T> RpcRetryingCaller<T> newCaller() {
|
||||
return new RpcRetryingCaller<T>() {
|
||||
@Override public void cancel() {
|
||||
|
||||
}
|
||||
|
||||
@Override public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
return callWithoutRetries(null, 0);
|
||||
}
|
||||
|
||||
@Override public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
try {
|
||||
throw exceptionClass.getConstructor().newInstance();
|
||||
} catch (IllegalAccessException | InstantiationException
|
||||
| InvocationTargetException | NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaLookupThreadPoolCreated() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
|
|
Loading…
Reference in New Issue