HBASE-24024 : Reject multi() requests with rows higher than threshold (#1560)

Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
Viraj Jasani 2020-04-22 12:46:40 +05:30 committed by GitHub
parent 75c717d4c2
commit 4ed15e51d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 35 deletions

View File

@ -1946,4 +1946,16 @@ possible configurations would overwhelm and obscure the important.
responses with complete data.
</description>
</property>
<property>
<name>hbase.rpc.rows.size.threshold.reject</name>
<value>false</value>
<description>
If value is true, RegionServer will abort batch requests of Put/Delete with number of rows
in a batch operation exceeding threshold defined by value of config:
hbase.rpc.rows.warning.threshold. The default value is false and hence, by default, only
warning will be logged. This config should be turned on to prevent RegionServer from serving
very large batch size of rows and this way we can improve CPU usages by discarding
too large batch request.
</description>
</property>
</configuration>

View File

@ -300,6 +300,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
/*
* Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
"hbase.rpc.rows.size.threshold.reject";
/*
* Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
*/
private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
// Request counter. (Includes requests that are not serviced by regions.)
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
final LongAdder requestCount = new LongAdder();
@ -352,6 +364,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* Row size threshold for multi requests above which a warning is logged
*/
private final int rowSizeWarnThreshold;
/*
* Whether we should reject requests with very high no of rows i.e. beyond threshold
* defined by rowSizeWarnThreshold
*/
private final boolean rejectRowsWithSizeOverThreshold;
final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
@ -1225,6 +1242,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
this.ld = ld;
regionServer = rs;
rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
rejectRowsWithSizeOverThreshold =
conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
final RpcSchedulerFactory rpcSchedulerFactory;
try {
@ -2682,7 +2701,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
}
private void checkBatchSizeAndLogLargeSize(MultiRequest request) {
private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
int sum = 0;
String firstRegionName = null;
for (RegionAction regionAction : request.getRegionActionList()) {
@ -2693,6 +2712,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
if (sum > rowSizeWarnThreshold) {
ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold);
if (rejectRowsWithSizeOverThreshold) {
throw new ServiceException(
"Rejecting large batch operation for current batch with firstRegionName: "
+ firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
+ rowSizeWarnThreshold);
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,20 +17,24 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
@ -46,7 +50,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
* Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test
* via "Multi" commands) so classified as MediumTests
*/
@Category(MediumTests.class)
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMultiLogThreshold {
@ClassRule
@ -62,22 +67,30 @@ public class TestMultiLogThreshold {
private static HRegionServer RS;
private static int THRESHOLD;
@BeforeClass
public static void setup() throws Exception {
@Parameterized.Parameter
public static boolean rejectLargeBatchOp;
@Parameterized.Parameters
public static List<Object[]> params() {
return Arrays.asList(new Object[] { false }, new Object[] { true });
}
@Before
public void setupTest() throws Exception {
final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = new HBaseTestingUtility();
CONF = TEST_UTIL.getConfiguration();
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM);
RS = TEST_UTIL.getRSForFirstRegionInTable(tableName);
}
@Before
public void setupTest() throws Exception {
LD = Mockito.mock(RSRpcServices.LogDelegate.class);
SERVICES = new RSRpcServices(RS, LD);
@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
private enum ActionType {
@ -89,8 +102,9 @@ public class TestMultiLogThreshold {
* "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of
* Actions
*/
private void sendMultiRequest(int rows, ActionType actionType) throws ServiceException {
RpcController rpcc = Mockito.mock(RpcController.class);
private void sendMultiRequest(int rows, ActionType actionType)
throws ServiceException, IOException {
RpcController rpcc = Mockito.mock(HBaseRpcController.class);
MultiRequest.Builder builder = MultiRequest.newBuilder();
int numRAs = 1;
int numAs = 1;
@ -113,35 +127,38 @@ public class TestMultiLogThreshold {
}
builder.addRegionAction(rab.build());
}
try {
SERVICES.multi(rpcc, builder.build());
} catch (ClassCastException e) {
// swallow expected exception due to mocked RpcController
}
LD = Mockito.mock(RSRpcServices.LogDelegate.class);
SERVICES = new RSRpcServices(RS, LD);
SERVICES.multi(rpcc, builder.build());
}
@Test
public void testMultiLogThresholdRegionActions() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD + 1, ActionType.REGION_ACTIONS);
verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
}
try {
sendMultiRequest(THRESHOLD + 1, ActionType.REGION_ACTIONS);
Assert.assertFalse(rejectLargeBatchOp);
} catch (ServiceException e) {
Assert.assertTrue(rejectLargeBatchOp);
}
Mockito.verify(LD, Mockito.times(1))
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
@Test
public void testMultiNoLogThresholdRegionActions() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD, ActionType.REGION_ACTIONS);
verify(LD, Mockito.never()).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
}
Mockito.verify(LD, Mockito.never())
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
@Test
public void testMultiLogThresholdActions() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD + 1, ActionType.ACTIONS);
verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
}
try {
sendMultiRequest(THRESHOLD + 1, ActionType.ACTIONS);
Assert.assertFalse(rejectLargeBatchOp);
} catch (ServiceException e) {
Assert.assertTrue(rejectLargeBatchOp);
}
Mockito.verify(LD, Mockito.times(1))
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
@Test
public void testMultiNoLogThresholdAction() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD, ActionType.ACTIONS);
verify(LD, Mockito.never()).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
Mockito.verify(LD, Mockito.never())
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
}
}

View File

@ -2245,3 +2245,22 @@ The percent of region server RPC threads failed to abort RS.
.Default
`false`
[[hbase.rpc.rows.size.threshold.reject]]
*`hbase.rpc.rows.size.threshold.reject`*::
+
.Description
If value is true, RegionServer will abort batch requests of
Put/Delete with number of rows in a batch operation exceeding
threshold defined by value of config:
hbase.rpc.rows.warning.threshold.
The default value is false and hence, by default, only
warning will be logged. This config should be turned on to
prevent RegionServer from serving
very large batch size of rows and this way we can improve
CPU usages by discarding too large batch request.
+
.Default
`false`