HBASE-20493 Port HBASE-19994 (Create a new class for RPC throttling exception, make it retryable) to branch-1
This commit is contained in:
parent
71c573239b
commit
e793e7c30c
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||
import org.apache.hadoop.hbase.ipc.FailedServerException;
|
||||
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
|
||||
import org.apache.hadoop.hbase.quotas.ThrottlingException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
|
@ -60,7 +61,8 @@ public final class ClientExceptionsUtil {
|
|||
|
||||
public static boolean isSpecialException(Throwable cur) {
|
||||
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
||||
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|
||||
|| cur instanceof RegionTooBusyException
|
||||
|| cur instanceof ThrottlingException || cur instanceof RpcThrottlingException
|
||||
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|
||||
|| cur instanceof CallQueueTooBigException || cur instanceof CallDroppedException
|
||||
|| cur instanceof NotServingRegionException || cur instanceof RequestTooBigException);
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Describe the throttling result. TODO: At some point this will be handled on the client side to
|
||||
* prevent operation to go on the server if the waitInterval is greater than the one got as result
|
||||
* of this exception.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class RpcThrottlingException extends HBaseIOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public enum Type {
|
||||
NumRequestsExceeded, RequestSizeExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded,
|
||||
WriteSizeExceeded, ReadSizeExceeded,
|
||||
}
|
||||
|
||||
private static final String[] MSG_TYPE =
|
||||
new String[] { "number of requests exceeded", "request size limit exceeded",
|
||||
"number of read requests exceeded", "number of write requests exceeded",
|
||||
"write size limit exceeded", "read size limit exceeded", };
|
||||
|
||||
private static final String MSG_WAIT = " - wait ";
|
||||
|
||||
private long waitInterval;
|
||||
private Type type;
|
||||
|
||||
public RpcThrottlingException(String msg) {
|
||||
super(msg);
|
||||
|
||||
// Dirty workaround to get the information after
|
||||
// ((RemoteException)e.getCause()).unwrapRemoteException()
|
||||
for (int i = 0; i < MSG_TYPE.length; ++i) {
|
||||
int index = msg.indexOf(MSG_TYPE[i]);
|
||||
if (index >= 0) {
|
||||
String waitTimeStr = msg.substring(index + MSG_TYPE[i].length() + MSG_WAIT.length());
|
||||
type = Type.values()[i];
|
||||
waitInterval = timeFromString(waitTimeStr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public RpcThrottlingException(final Type type, final long waitInterval, final String msg) {
|
||||
super(msg);
|
||||
this.waitInterval = waitInterval;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public Type getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public long getWaitInterval() {
|
||||
return this.waitInterval;
|
||||
}
|
||||
|
||||
public static void throwNumRequestsExceeded(final long waitInterval) throws
|
||||
RpcThrottlingException {
|
||||
throwThrottlingException(Type.NumRequestsExceeded, waitInterval);
|
||||
}
|
||||
|
||||
public static void throwRequestSizeExceeded(final long waitInterval)
|
||||
throws RpcThrottlingException {
|
||||
throwThrottlingException(Type.RequestSizeExceeded, waitInterval);
|
||||
}
|
||||
|
||||
public static void throwNumReadRequestsExceeded(final long waitInterval)
|
||||
throws RpcThrottlingException {
|
||||
throwThrottlingException(Type.NumReadRequestsExceeded, waitInterval);
|
||||
}
|
||||
|
||||
public static void throwNumWriteRequestsExceeded(final long waitInterval)
|
||||
throws RpcThrottlingException {
|
||||
throwThrottlingException(Type.NumWriteRequestsExceeded, waitInterval);
|
||||
}
|
||||
|
||||
public static void throwWriteSizeExceeded(final long waitInterval) throws RpcThrottlingException {
|
||||
throwThrottlingException(Type.WriteSizeExceeded, waitInterval);
|
||||
}
|
||||
|
||||
public static void throwReadSizeExceeded(final long waitInterval) throws RpcThrottlingException {
|
||||
throwThrottlingException(Type.ReadSizeExceeded, waitInterval);
|
||||
}
|
||||
|
||||
private static void throwThrottlingException(final Type type, final long waitInterval)
|
||||
throws RpcThrottlingException {
|
||||
String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + StringUtils.formatTime(waitInterval);
|
||||
throw new RpcThrottlingException(type, waitInterval, msg);
|
||||
}
|
||||
|
||||
private static long timeFromString(String timeDiff) {
|
||||
Pattern[] patterns =
|
||||
new Pattern[] { Pattern.compile("^(\\d+\\.\\d\\d)sec"),
|
||||
Pattern.compile("^(\\d+)mins, (\\d+\\.\\d\\d)sec"),
|
||||
Pattern.compile("^(\\d+)hrs, (\\d+)mins, (\\d+\\.\\d\\d)sec") };
|
||||
|
||||
for (int i = 0; i < patterns.length; ++i) {
|
||||
Matcher m = patterns[i].matcher(timeDiff);
|
||||
if (m.find()) {
|
||||
long time = Math.round(Float.parseFloat(m.group(1 + i)) * 1000);
|
||||
if (i > 0) {
|
||||
time += Long.parseLong(m.group(i)) * (60 * 1000);
|
||||
}
|
||||
if (i > 1) {
|
||||
time += Long.parseLong(m.group(i - 1)) * (60 * 60 * 1000);
|
||||
}
|
||||
return time;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
* Describe the throttling result. TODO: At some point this will be handled on the client side to
|
||||
* prevent operation to go on the server if the waitInterval is grater than the one got as result of
|
||||
* this exception.
|
||||
* @deprecated replaced by {@link RpcThrottlingException}
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
|
|
|
@ -51,7 +51,7 @@ public class DefaultOperationQuota implements OperationQuota {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException {
|
||||
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
|
||||
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
|
||||
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
|
||||
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
|
||||
|
|
|
@ -35,7 +35,7 @@ final class NoopOperationQuota implements OperationQuota {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException {
|
||||
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
|
||||
// no-op
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ final class NoopQuotaLimiter implements QuotaLimiter {
|
|||
|
||||
@Override
|
||||
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
|
||||
long estimateReadSize) throws ThrottlingException {
|
||||
long estimateReadSize) throws RpcThrottlingException {
|
||||
// no-op
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ public interface OperationQuota {
|
|||
* @param numScans number of long-read operation that will be performed
|
||||
* @throws ThrottlingException if the operation cannot be performed
|
||||
*/
|
||||
void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException;
|
||||
void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException;
|
||||
|
||||
/** Cleanup method on operation completion */
|
||||
void close();
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas;
|
|||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
|
||||
|
||||
/**
|
||||
* Internal interface used to interact with the user/table quota.
|
||||
|
@ -35,10 +34,10 @@ public interface QuotaLimiter {
|
|||
* @param estimateWriteSize the write size that will be checked against the available quota
|
||||
* @param readReqs the read requests that will be checked against the available quota
|
||||
* @param estimateReadSize the read size that will be checked against the available quota
|
||||
* @throws ThrottlingException thrown if not enough avialable resources to perform operation.
|
||||
* @throws RpcThrottlingException thrown if not enough avialable resources to perform operation.
|
||||
*/
|
||||
void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize)
|
||||
throws ThrottlingException;
|
||||
throws RpcThrottlingException;
|
||||
|
||||
/**
|
||||
* Removes the specified write and read amount from the quota.
|
||||
|
|
|
@ -50,6 +50,10 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
|
||||
private static final boolean QUOTA_ENABLED_DEFAULT = false;
|
||||
|
||||
public static final String QUOTA_RETRYABLE_THROTTING_EXCEPTION_CONF_KEY =
|
||||
"hbase.quota.retryable.throttlingexception";
|
||||
public static final boolean QUOTA_RETRYABLE_THROTTING_EXCEPTION_DEFAULT = false;
|
||||
|
||||
/** Table descriptor for Quota internal table */
|
||||
public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME);
|
||||
static {
|
||||
|
|
|
@ -16,6 +16,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
@ -43,6 +44,7 @@ public class RegionServerQuotaManager {
|
|||
private final RegionServerServices rsServices;
|
||||
|
||||
private QuotaCache quotaCache = null;
|
||||
private boolean useRetryableThrottlingException;
|
||||
|
||||
public RegionServerQuotaManager(final RegionServerServices rsServices) {
|
||||
this.rsServices = rsServices;
|
||||
|
@ -59,6 +61,10 @@ public class RegionServerQuotaManager {
|
|||
// Initialize quota cache
|
||||
quotaCache = new QuotaCache(rsServices);
|
||||
quotaCache.start();
|
||||
|
||||
useRetryableThrottlingException = rsServices.getConfiguration()
|
||||
.getBoolean(QuotaUtil.QUOTA_RETRYABLE_THROTTING_EXCEPTION_CONF_KEY,
|
||||
QuotaUtil.QUOTA_RETRYABLE_THROTTING_EXCEPTION_DEFAULT);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
|
@ -119,7 +125,7 @@ public class RegionServerQuotaManager {
|
|||
* @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
|
||||
*/
|
||||
public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type)
|
||||
throws IOException, ThrottlingException {
|
||||
throws IOException {
|
||||
switch (type) {
|
||||
case SCAN:
|
||||
return checkQuota(region, 0, 0, 1);
|
||||
|
@ -141,7 +147,7 @@ public class RegionServerQuotaManager {
|
|||
* @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
|
||||
*/
|
||||
public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
|
||||
throws IOException, ThrottlingException {
|
||||
throws IOException {
|
||||
int numWrites = 0;
|
||||
int numReads = 0;
|
||||
for (final ClientProtos.Action action : actions) {
|
||||
|
@ -165,7 +171,7 @@ public class RegionServerQuotaManager {
|
|||
* @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
|
||||
*/
|
||||
private OperationQuota checkQuota(final Region region, final int numWrites, final int numReads,
|
||||
final int numScans) throws IOException, ThrottlingException {
|
||||
final int numScans) throws IOException {
|
||||
User user = RpcServer.getRequestUser();
|
||||
UserGroupInformation ugi;
|
||||
if (user != null) {
|
||||
|
@ -178,11 +184,28 @@ public class RegionServerQuotaManager {
|
|||
OperationQuota quota = getQuota(ugi, table);
|
||||
try {
|
||||
quota.checkQuota(numWrites, numReads, numScans);
|
||||
} catch (ThrottlingException e) {
|
||||
} catch (HBaseIOException e) {
|
||||
LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table
|
||||
+ " numWrites=" + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": "
|
||||
+ e.getMessage());
|
||||
throw e;
|
||||
// Depending on whether we are supposed to throw a retryable IO exeption or not, choose
|
||||
// the correct exception type to (re)throw
|
||||
if (e instanceof ThrottlingException) {
|
||||
if (useRetryableThrottlingException) {
|
||||
throw new RpcThrottlingException(e.getMessage());
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
} else if (e instanceof RpcThrottlingException) {
|
||||
if (useRetryableThrottlingException) {
|
||||
throw e;
|
||||
} else {
|
||||
throw new ThrottlingException(e.getMessage());
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Unexpected exception from quota check", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return quota;
|
||||
}
|
||||
|
|
|
@ -111,31 +111,31 @@ public class TimeBasedLimiter implements QuotaLimiter {
|
|||
|
||||
@Override
|
||||
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
|
||||
long estimateReadSize) throws ThrottlingException {
|
||||
long estimateReadSize) throws RpcThrottlingException {
|
||||
if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
|
||||
ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
|
||||
RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
|
||||
}
|
||||
if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) {
|
||||
ThrottlingException.throwRequestSizeExceeded(
|
||||
RpcThrottlingException.throwRequestSizeExceeded(
|
||||
reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
|
||||
}
|
||||
|
||||
if (estimateWriteSize > 0) {
|
||||
if (!writeReqsLimiter.canExecute(writeReqs)) {
|
||||
ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
|
||||
RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
|
||||
}
|
||||
if (!writeSizeLimiter.canExecute(estimateWriteSize)) {
|
||||
ThrottlingException.throwWriteSizeExceeded(
|
||||
RpcThrottlingException.throwWriteSizeExceeded(
|
||||
writeSizeLimiter.waitInterval(estimateWriteSize));
|
||||
}
|
||||
}
|
||||
|
||||
if (estimateReadSize > 0) {
|
||||
if (!readReqsLimiter.canExecute(readReqs)) {
|
||||
ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
|
||||
RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
|
||||
}
|
||||
if (!readSizeLimiter.canExecute(estimateReadSize)) {
|
||||
ThrottlingException.throwReadSizeExceeded(
|
||||
RpcThrottlingException.throwReadSizeExceeded(
|
||||
readSizeLimiter.waitInterval(estimateReadSize));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
|
||||
|
@ -207,7 +208,7 @@ public class TestQuotaState {
|
|||
try {
|
||||
limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0);
|
||||
fail("Should have thrown ThrottlingException");
|
||||
} catch (ThrottlingException e) {
|
||||
} catch (HBaseIOException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
@ -226,7 +227,7 @@ public class TestQuotaState {
|
|||
try {
|
||||
limiter.checkQuota(1, 1, 0, 0);
|
||||
fail("Should have thrown ThrottlingException");
|
||||
} catch (ThrottlingException e) {
|
||||
} catch (HBaseIOException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
@ -235,7 +236,7 @@ public class TestQuotaState {
|
|||
for (int i = 0; i < availReqs; ++i) {
|
||||
try {
|
||||
limiter.checkQuota(1, 1, 0, 0);
|
||||
} catch (ThrottlingException e) {
|
||||
} catch (HBaseIOException e) {
|
||||
fail("Unexpected ThrottlingException after " + i + " requests. limit=" + availReqs);
|
||||
}
|
||||
limiter.grabQuota(1, 1, 0, 0);
|
||||
|
|
|
@ -61,6 +61,8 @@ public class TestQuotaThrottle {
|
|||
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
|
||||
TEST_UTIL.getConfiguration().setBoolean(
|
||||
QuotaUtil.QUOTA_RETRYABLE_THROTTING_EXCEPTION_CONF_KEY, false);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
|
||||
QuotaCache.setTEST_FORCE_REFRESH(true);
|
||||
|
|
Loading…
Reference in New Issue