HBASE-19994 Create a new class for RPC throttling exception, make it retryable
This commit is contained in:
parent
1c8d9d788f
commit
36680da549
@ -39,7 +39,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||||
import org.apache.hadoop.hbase.ipc.FailedServerException;
|
import org.apache.hadoop.hbase.ipc.FailedServerException;
|
||||||
import org.apache.hadoop.hbase.quotas.ThrottlingException;
|
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@ -60,7 +60,7 @@ public final class ClientExceptionsUtil {
|
|||||||
|
|
||||||
public static boolean isSpecialException(Throwable cur) {
|
public static boolean isSpecialException(Throwable cur) {
|
||||||
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
||||||
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|
|| cur instanceof RegionTooBusyException || cur instanceof RpcThrottlingException
|
||||||
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|
||||||
|| cur instanceof CallQueueTooBigException || cur instanceof CallDroppedException
|
|| cur instanceof CallQueueTooBigException || cur instanceof CallDroppedException
|
||||||
|| cur instanceof NotServingRegionException || cur instanceof RequestTooBigException);
|
|| cur instanceof NotServingRegionException || cur instanceof RequestTooBigException);
|
||||||
@ -73,7 +73,7 @@ public final class ClientExceptionsUtil {
|
|||||||
* - nested exceptions
|
* - nested exceptions
|
||||||
*
|
*
|
||||||
* Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException /
|
* Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException /
|
||||||
* ThrottlingException
|
* RpcThrottlingException
|
||||||
* @return null if we didn't find the exception, the exception otherwise.
|
* @return null if we didn't find the exception, the exception otherwise.
|
||||||
*/
|
*/
|
||||||
public static Throwable findException(Object exception) {
|
public static Throwable findException(Object exception) {
|
||||||
|
@ -0,0 +1,131 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.quotas;
|
||||||
|
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Describe the throttling result. TODO: At some point this will be handled on the client side to
|
||||||
|
* prevent operation to go on the server if the waitInterval is greater than the one got as result
|
||||||
|
* of this exception.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public class RpcThrottlingException extends HBaseIOException {
|
||||||
|
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public enum Type {
|
||||||
|
NumRequestsExceeded, RequestSizeExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded,
|
||||||
|
WriteSizeExceeded, ReadSizeExceeded,
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final String[] MSG_TYPE =
|
||||||
|
new String[] { "number of requests exceeded", "request size limit exceeded",
|
||||||
|
"number of read requests exceeded", "number of write requests exceeded",
|
||||||
|
"write size limit exceeded", "read size limit exceeded", };
|
||||||
|
|
||||||
|
private static final String MSG_WAIT = " - wait ";
|
||||||
|
|
||||||
|
private long waitInterval;
|
||||||
|
private Type type;
|
||||||
|
|
||||||
|
public RpcThrottlingException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
|
||||||
|
// Dirty workaround to get the information after
|
||||||
|
// ((RemoteException)e.getCause()).unwrapRemoteException()
|
||||||
|
for (int i = 0; i < MSG_TYPE.length; ++i) {
|
||||||
|
int index = msg.indexOf(MSG_TYPE[i]);
|
||||||
|
if (index >= 0) {
|
||||||
|
String waitTimeStr = msg.substring(index + MSG_TYPE[i].length() + MSG_WAIT.length());
|
||||||
|
type = Type.values()[i];
|
||||||
|
waitInterval = timeFromString(waitTimeStr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public RpcThrottlingException(final Type type, final long waitInterval, final String msg) {
|
||||||
|
super(msg);
|
||||||
|
this.waitInterval = waitInterval;
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Type getType() {
|
||||||
|
return this.type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getWaitInterval() {
|
||||||
|
return this.waitInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void throwNumRequestsExceeded(final long waitInterval) throws
|
||||||
|
RpcThrottlingException {
|
||||||
|
throwThrottlingException(Type.NumRequestsExceeded, waitInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void throwRequestSizeExceeded(final long waitInterval)
|
||||||
|
throws RpcThrottlingException {
|
||||||
|
throwThrottlingException(Type.RequestSizeExceeded, waitInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void throwNumReadRequestsExceeded(final long waitInterval)
|
||||||
|
throws RpcThrottlingException {
|
||||||
|
throwThrottlingException(Type.NumReadRequestsExceeded, waitInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void throwNumWriteRequestsExceeded(final long waitInterval)
|
||||||
|
throws RpcThrottlingException {
|
||||||
|
throwThrottlingException(Type.NumWriteRequestsExceeded, waitInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void throwWriteSizeExceeded(final long waitInterval) throws RpcThrottlingException {
|
||||||
|
throwThrottlingException(Type.WriteSizeExceeded, waitInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void throwReadSizeExceeded(final long waitInterval) throws RpcThrottlingException {
|
||||||
|
throwThrottlingException(Type.ReadSizeExceeded, waitInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void throwThrottlingException(final Type type, final long waitInterval)
|
||||||
|
throws RpcThrottlingException {
|
||||||
|
String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + StringUtils.formatTime(waitInterval);
|
||||||
|
throw new RpcThrottlingException(type, waitInterval, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long timeFromString(String timeDiff) {
|
||||||
|
Pattern[] patterns =
|
||||||
|
new Pattern[] { Pattern.compile("^(\\d+\\.\\d\\d)sec"),
|
||||||
|
Pattern.compile("^(\\d+)mins, (\\d+\\.\\d\\d)sec"),
|
||||||
|
Pattern.compile("^(\\d+)hrs, (\\d+)mins, (\\d+\\.\\d\\d)sec") };
|
||||||
|
|
||||||
|
for (int i = 0; i < patterns.length; ++i) {
|
||||||
|
Matcher m = patterns[i].matcher(timeDiff);
|
||||||
|
if (m.find()) {
|
||||||
|
long time = Math.round(Float.parseFloat(m.group(1 + i)) * 1000);
|
||||||
|
if (i > 0) {
|
||||||
|
time += Long.parseLong(m.group(i)) * (60 * 1000);
|
||||||
|
}
|
||||||
|
if (i > 1) {
|
||||||
|
time += Long.parseLong(m.group(i - 1)) * (60 * 60 * 1000);
|
||||||
|
}
|
||||||
|
return time;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
@ -29,7 +29,10 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||||||
* TODO: At some point this will be handled on the client side to prevent
|
* TODO: At some point this will be handled on the client side to prevent
|
||||||
* operation to go on the server if the waitInterval is grater than the one got
|
* operation to go on the server if the waitInterval is grater than the one got
|
||||||
* as result of this exception.
|
* as result of this exception.
|
||||||
|
*
|
||||||
|
* @deprecated replaced by {@link RpcThrottlingException} since hbase-2.0.0.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
public class ThrottlingException extends QuotaExceededException {
|
public class ThrottlingException extends QuotaExceededException {
|
||||||
private static final long serialVersionUID = 1406576492085155743L;
|
private static final long serialVersionUID = 1406576492085155743L;
|
||||||
|
@ -59,7 +59,7 @@ public class DefaultOperationQuota implements OperationQuota {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkQuota(int numWrites, int numReads, int numScans)
|
public void checkQuota(int numWrites, int numReads, int numScans)
|
||||||
throws ThrottlingException {
|
throws RpcThrottlingException {
|
||||||
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
|
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
|
||||||
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
|
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
|
||||||
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
|
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
|
||||||
|
@ -43,7 +43,7 @@ class NoopOperationQuota implements OperationQuota {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkQuota(int numWrites, int numReads, int numScans)
|
public void checkQuota(int numWrites, int numReads, int numScans)
|
||||||
throws ThrottlingException {
|
throws RpcThrottlingException {
|
||||||
// no-op
|
// no-op
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ class NoopQuotaLimiter implements QuotaLimiter {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkQuota(long estimateWriteSize, long estimateReadSize)
|
public void checkQuota(long estimateWriteSize, long estimateReadSize)
|
||||||
throws ThrottlingException {
|
throws RpcThrottlingException {
|
||||||
// no-op
|
// no-op
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,10 +41,11 @@ public interface OperationQuota {
|
|||||||
* @param numWrites number of write operation that will be performed
|
* @param numWrites number of write operation that will be performed
|
||||||
* @param numReads number of small-read operation that will be performed
|
* @param numReads number of small-read operation that will be performed
|
||||||
* @param numScans number of long-read operation that will be performed
|
* @param numScans number of long-read operation that will be performed
|
||||||
* @throws ThrottlingException if the operation cannot be performed
|
* @throws RpcThrottlingException if the operation cannot be performed because
|
||||||
|
* RPC quota is exceeded.
|
||||||
*/
|
*/
|
||||||
void checkQuota(int numWrites, int numReads, int numScans)
|
void checkQuota(int numWrites, int numReads, int numScans)
|
||||||
throws ThrottlingException;
|
throws RpcThrottlingException;
|
||||||
|
|
||||||
/** Cleanup method on operation completion */
|
/** Cleanup method on operation completion */
|
||||||
void close();
|
void close();
|
||||||
|
@ -33,10 +33,10 @@ public interface QuotaLimiter {
|
|||||||
*
|
*
|
||||||
* @param estimateWriteSize the write size that will be checked against the available quota
|
* @param estimateWriteSize the write size that will be checked against the available quota
|
||||||
* @param estimateReadSize the read size that will be checked against the available quota
|
* @param estimateReadSize the read size that will be checked against the available quota
|
||||||
* @throws ThrottlingException thrown if not enough avialable resources to perform operation.
|
* @throws RpcThrottlingException thrown if not enough avialable resources to perform operation.
|
||||||
*/
|
*/
|
||||||
void checkQuota(long estimateWriteSize, long estimateReadSize)
|
void checkQuota(long estimateWriteSize, long estimateReadSize)
|
||||||
throws ThrottlingException;
|
throws RpcThrottlingException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes the specified write and read amount from the quota.
|
* Removes the specified write and read amount from the quota.
|
||||||
|
@ -127,10 +127,10 @@ public class RegionServerRpcQuotaManager {
|
|||||||
* @param region the region where the operation will be performed
|
* @param region the region where the operation will be performed
|
||||||
* @param type the operation type
|
* @param type the operation type
|
||||||
* @return the OperationQuota
|
* @return the OperationQuota
|
||||||
* @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
|
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
|
||||||
*/
|
*/
|
||||||
public OperationQuota checkQuota(final Region region,
|
public OperationQuota checkQuota(final Region region,
|
||||||
final OperationQuota.OperationType type) throws IOException, ThrottlingException {
|
final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case SCAN: return checkQuota(region, 0, 0, 1);
|
case SCAN: return checkQuota(region, 0, 0, 1);
|
||||||
case GET: return checkQuota(region, 0, 1, 0);
|
case GET: return checkQuota(region, 0, 1, 0);
|
||||||
@ -146,10 +146,10 @@ public class RegionServerRpcQuotaManager {
|
|||||||
* @param region the region where the operation will be performed
|
* @param region the region where the operation will be performed
|
||||||
* @param actions the "multi" actions to perform
|
* @param actions the "multi" actions to perform
|
||||||
* @return the OperationQuota
|
* @return the OperationQuota
|
||||||
* @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
|
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
|
||||||
*/
|
*/
|
||||||
public OperationQuota checkQuota(final Region region,
|
public OperationQuota checkQuota(final Region region,
|
||||||
final List<ClientProtos.Action> actions) throws IOException, ThrottlingException {
|
final List<ClientProtos.Action> actions) throws IOException, RpcThrottlingException {
|
||||||
int numWrites = 0;
|
int numWrites = 0;
|
||||||
int numReads = 0;
|
int numReads = 0;
|
||||||
for (final ClientProtos.Action action: actions) {
|
for (final ClientProtos.Action action: actions) {
|
||||||
@ -171,11 +171,11 @@ public class RegionServerRpcQuotaManager {
|
|||||||
* @param numReads number of short-reads to perform
|
* @param numReads number of short-reads to perform
|
||||||
* @param numScans number of scan to perform
|
* @param numScans number of scan to perform
|
||||||
* @return the OperationQuota
|
* @return the OperationQuota
|
||||||
* @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
|
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
|
||||||
*/
|
*/
|
||||||
private OperationQuota checkQuota(final Region region,
|
private OperationQuota checkQuota(final Region region,
|
||||||
final int numWrites, final int numReads, final int numScans)
|
final int numWrites, final int numReads, final int numScans)
|
||||||
throws IOException, ThrottlingException {
|
throws IOException, RpcThrottlingException {
|
||||||
Optional<User> user = RpcServer.getRequestUser();
|
Optional<User> user = RpcServer.getRequestUser();
|
||||||
UserGroupInformation ugi;
|
UserGroupInformation ugi;
|
||||||
if (user.isPresent()) {
|
if (user.isPresent()) {
|
||||||
@ -188,7 +188,7 @@ public class RegionServerRpcQuotaManager {
|
|||||||
OperationQuota quota = getQuota(ugi, table);
|
OperationQuota quota = getQuota(ugi, table);
|
||||||
try {
|
try {
|
||||||
quota.checkQuota(numWrites, numReads, numScans);
|
quota.checkQuota(numWrites, numReads, numScans);
|
||||||
} catch (ThrottlingException e) {
|
} catch (RpcThrottlingException e) {
|
||||||
LOG.debug("Throttling exception for user=" + ugi.getUserName() +
|
LOG.debug("Throttling exception for user=" + ugi.getUserName() +
|
||||||
" table=" + table + " numWrites=" + numWrites +
|
" table=" + table + " numWrites=" + numWrites +
|
||||||
" numReads=" + numReads + " numScans=" + numScans +
|
" numReads=" + numReads + " numScans=" + numScans +
|
||||||
|
@ -110,30 +110,30 @@ public class TimeBasedLimiter implements QuotaLimiter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkQuota(long writeSize, long readSize) throws ThrottlingException {
|
public void checkQuota(long writeSize, long readSize) throws RpcThrottlingException {
|
||||||
if (!reqsLimiter.canExecute()) {
|
if (!reqsLimiter.canExecute()) {
|
||||||
ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
|
RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
|
||||||
}
|
}
|
||||||
if (!reqSizeLimiter.canExecute(writeSize + readSize)) {
|
if (!reqSizeLimiter.canExecute(writeSize + readSize)) {
|
||||||
ThrottlingException.throwRequestSizeExceeded(reqSizeLimiter
|
RpcThrottlingException.throwRequestSizeExceeded(reqSizeLimiter
|
||||||
.waitInterval(writeSize + readSize));
|
.waitInterval(writeSize + readSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (writeSize > 0) {
|
if (writeSize > 0) {
|
||||||
if (!writeReqsLimiter.canExecute()) {
|
if (!writeReqsLimiter.canExecute()) {
|
||||||
ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
|
RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
|
||||||
}
|
}
|
||||||
if (!writeSizeLimiter.canExecute(writeSize)) {
|
if (!writeSizeLimiter.canExecute(writeSize)) {
|
||||||
ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize));
|
RpcThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (readSize > 0) {
|
if (readSize > 0) {
|
||||||
if (!readReqsLimiter.canExecute()) {
|
if (!readReqsLimiter.canExecute()) {
|
||||||
ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
|
RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
|
||||||
}
|
}
|
||||||
if (!readSizeLimiter.canExecute(readSize)) {
|
if (!readSizeLimiter.canExecute(readSize)) {
|
||||||
ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize));
|
RpcThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.*;
|
|||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||||
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
||||||
import org.apache.hadoop.hbase.quotas.ThrottlingException;
|
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
@ -193,12 +193,12 @@ public class TestMetaCache {
|
|||||||
|
|
||||||
public static List<Throwable> metaCachePreservingExceptions() {
|
public static List<Throwable> metaCachePreservingExceptions() {
|
||||||
return new ArrayList<Throwable>() {{
|
return new ArrayList<Throwable>() {{
|
||||||
add(new RegionOpeningException(" "));
|
add(new RegionOpeningException(" "));
|
||||||
add(new RegionTooBusyException("Some old message"));
|
add(new RegionTooBusyException("Some old message"));
|
||||||
add(new ThrottlingException(" "));
|
add(new RpcThrottlingException(" "));
|
||||||
add(new MultiActionResultTooLarge(" "));
|
add(new MultiActionResultTooLarge(" "));
|
||||||
add(new RetryImmediatelyException(" "));
|
add(new RetryImmediatelyException(" "));
|
||||||
add(new CallQueueTooBigException());
|
add(new CallQueueTooBigException());
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,8 +22,12 @@ import static org.junit.Assert.assertNull;
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
@ -31,13 +35,18 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -99,10 +108,10 @@ public class TestQuotaAdmin {
|
|||||||
Admin admin = TEST_UTIL.getAdmin();
|
Admin admin = TEST_UTIL.getAdmin();
|
||||||
String userName = User.getCurrent().getShortName();
|
String userName = User.getCurrent().getShortName();
|
||||||
|
|
||||||
|
admin.setQuota(
|
||||||
|
QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
||||||
admin.setQuota(QuotaSettingsFactory
|
admin.setQuota(QuotaSettingsFactory
|
||||||
.throttleUser(userName, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
.throttleUser(userName, ThrottleType.WRITE_NUMBER, 12, TimeUnit.MINUTES));
|
||||||
admin.setQuota(QuotaSettingsFactory
|
|
||||||
.throttleUser(userName, ThrottleType.WRITE_NUMBER, 12, TimeUnit.MINUTES));
|
|
||||||
admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, true));
|
admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, true));
|
||||||
|
|
||||||
try (QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration())) {
|
try (QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration())) {
|
||||||
@ -183,6 +192,49 @@ public class TestQuotaAdmin {
|
|||||||
assertNumResults(0, null);
|
assertNumResults(0, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiQuotaThrottling() throws Exception {
|
||||||
|
byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||||
|
byte[] ROW = Bytes.toBytes("testRow");
|
||||||
|
byte[] QUALIFIER = Bytes.toBytes("testQualifier");
|
||||||
|
byte[] VALUE = Bytes.toBytes("testValue");
|
||||||
|
|
||||||
|
Admin admin = TEST_UTIL.getAdmin();
|
||||||
|
TableName tableName = TableName.valueOf("testMultiQuotaThrottling");
|
||||||
|
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
|
||||||
|
admin.createTable(desc);
|
||||||
|
|
||||||
|
// Set up the quota.
|
||||||
|
admin.setQuota(QuotaSettingsFactory.throttleTable(tableName, ThrottleType.WRITE_NUMBER, 6,
|
||||||
|
TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegionServerRpcQuotaManager().
|
||||||
|
getQuotaCache().triggerCacheRefresh();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
Table t = TEST_UTIL.getConnection().getTable(tableName);
|
||||||
|
try {
|
||||||
|
int size = 5;
|
||||||
|
List actions = new ArrayList();
|
||||||
|
Object[] results = new Object[size];
|
||||||
|
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
Put put1 = new Put(ROW);
|
||||||
|
put1.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||||
|
actions.add(put1);
|
||||||
|
}
|
||||||
|
t.batch(actions, results);
|
||||||
|
t.batch(actions, results);
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail("Not supposed to get ThrottlingExcepiton " + e);
|
||||||
|
} finally {
|
||||||
|
t.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQuotaRetrieverFilter() throws Exception {
|
public void testQuotaRetrieverFilter() throws Exception {
|
||||||
Admin admin = TEST_UTIL.getAdmin();
|
Admin admin = TEST_UTIL.getAdmin();
|
||||||
@ -321,8 +373,8 @@ public class TestQuotaAdmin {
|
|||||||
final TableName tn = TableName.valueOf("sq_table2");
|
final TableName tn = TableName.valueOf("sq_table2");
|
||||||
final long originalSizeLimit = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB
|
final long originalSizeLimit = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB
|
||||||
final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_WRITES;
|
final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_WRITES;
|
||||||
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
|
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, originalSizeLimit,
|
||||||
tn, originalSizeLimit, violationPolicy);
|
violationPolicy);
|
||||||
admin.setQuota(settings);
|
admin.setQuota(settings);
|
||||||
|
|
||||||
// Verify the Quotas in the table
|
// Verify the Quotas in the table
|
||||||
@ -349,8 +401,8 @@ public class TestQuotaAdmin {
|
|||||||
// Setting a new size and policy should be reflected
|
// Setting a new size and policy should be reflected
|
||||||
final long newSizeLimit = 1024L * 1024L * 1024L * 1024L; // 1TB
|
final long newSizeLimit = 1024L * 1024L * 1024L * 1024L; // 1TB
|
||||||
final SpaceViolationPolicy newViolationPolicy = SpaceViolationPolicy.NO_WRITES_COMPACTIONS;
|
final SpaceViolationPolicy newViolationPolicy = SpaceViolationPolicy.NO_WRITES_COMPACTIONS;
|
||||||
QuotaSettings newSettings = QuotaSettingsFactory.limitTableSpace(
|
QuotaSettings newSettings = QuotaSettingsFactory.limitTableSpace(tn, newSizeLimit,
|
||||||
tn, newSizeLimit, newViolationPolicy);
|
newViolationPolicy);
|
||||||
admin.setQuota(newSettings);
|
admin.setQuota(newSettings);
|
||||||
|
|
||||||
// Verify the new Quotas in the table
|
// Verify the new Quotas in the table
|
||||||
|
@ -216,7 +216,7 @@ public class TestQuotaState {
|
|||||||
try {
|
try {
|
||||||
limiter.checkQuota(1, 1);
|
limiter.checkQuota(1, 1);
|
||||||
fail("Should have thrown ThrottlingException");
|
fail("Should have thrown ThrottlingException");
|
||||||
} catch (ThrottlingException e) {
|
} catch (RpcThrottlingException e) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -225,7 +225,7 @@ public class TestQuotaState {
|
|||||||
for (int i = 0; i < availReqs; ++i) {
|
for (int i = 0; i < availReqs; ++i) {
|
||||||
try {
|
try {
|
||||||
limiter.checkQuota(1, 1);
|
limiter.checkQuota(1, 1);
|
||||||
} catch (ThrottlingException e) {
|
} catch (RpcThrottlingException e) {
|
||||||
fail("Unexpected ThrottlingException after " + i + " requests. limit=" + availReqs);
|
fail("Unexpected ThrottlingException after " + i + " requests. limit=" + availReqs);
|
||||||
}
|
}
|
||||||
limiter.grabQuota(1, 1);
|
limiter.grabQuota(1, 1);
|
||||||
|
@ -520,7 +520,7 @@ public class TestQuotaThrottle {
|
|||||||
}
|
}
|
||||||
count += tables.length;
|
count += tables.length;
|
||||||
}
|
}
|
||||||
} catch (ThrottlingException e) {
|
} catch (RpcThrottlingException e) {
|
||||||
LOG.error("put failed after nRetries=" + count, e);
|
LOG.error("put failed after nRetries=" + count, e);
|
||||||
}
|
}
|
||||||
return count;
|
return count;
|
||||||
@ -536,7 +536,7 @@ public class TestQuotaThrottle {
|
|||||||
}
|
}
|
||||||
count += tables.length;
|
count += tables.length;
|
||||||
}
|
}
|
||||||
} catch (ThrottlingException e) {
|
} catch (RpcThrottlingException e) {
|
||||||
LOG.error("get failed after nRetries=" + count, e);
|
LOG.error("get failed after nRetries=" + count, e);
|
||||||
}
|
}
|
||||||
return count;
|
return count;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user