HBASE-24024 : Reject multi() requests with rows higher than threshold (#1560)
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
parent
1d35d0a9a9
commit
5ad233b60c
|
@ -1960,4 +1960,16 @@ possible configurations would overwhelm and obscure the important.
|
|||
responses with complete data.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.rpc.rows.size.threshold.reject</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
If value is true, RegionServer will abort batch requests of Put/Delete with number of rows
|
||||
in a batch operation exceeding threshold defined by value of config:
|
||||
hbase.rpc.rows.warning.threshold. The default value is false and hence, by default, only
|
||||
warning will be logged. This config should be turned on to prevent RegionServer from serving
|
||||
very large batch size of rows and this way we can improve CPU usages by discarding
|
||||
too large batch request.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -296,6 +296,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
*/
|
||||
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
|
||||
|
||||
/*
|
||||
* Whether to reject rows with size > threshold defined by
|
||||
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
|
||||
*/
|
||||
private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
|
||||
"hbase.rpc.rows.size.threshold.reject";
|
||||
|
||||
/*
|
||||
* Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
|
||||
*/
|
||||
private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
|
||||
|
||||
// Request counter. (Includes requests that are not serviced by regions.)
|
||||
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
|
||||
final LongAdder requestCount = new LongAdder();
|
||||
|
@ -348,6 +360,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
* Row size threshold for multi requests above which a warning is logged
|
||||
*/
|
||||
private final int rowSizeWarnThreshold;
|
||||
/*
|
||||
* Whether we should reject requests with very high no of rows i.e. beyond threshold
|
||||
* defined by rowSizeWarnThreshold
|
||||
*/
|
||||
private final boolean rejectRowsWithSizeOverThreshold;
|
||||
|
||||
final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
|
||||
|
||||
|
@ -1221,6 +1238,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
this.ld = ld;
|
||||
regionServer = rs;
|
||||
rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
|
||||
rejectRowsWithSizeOverThreshold =
|
||||
conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
|
||||
|
||||
final RpcSchedulerFactory rpcSchedulerFactory;
|
||||
try {
|
||||
|
@ -2655,7 +2674,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
|
||||
}
|
||||
|
||||
private void checkBatchSizeAndLogLargeSize(MultiRequest request) {
|
||||
private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
|
||||
int sum = 0;
|
||||
String firstRegionName = null;
|
||||
for (RegionAction regionAction : request.getRegionActionList()) {
|
||||
|
@ -2666,6 +2685,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
if (sum > rowSizeWarnThreshold) {
|
||||
ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold);
|
||||
if (rejectRowsWithSizeOverThreshold) {
|
||||
throw new ServiceException(
|
||||
"Rejecting large batch operation for current batch with firstRegionName: "
|
||||
+ firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
|
||||
+ rowSizeWarnThreshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -17,20 +17,24 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
|
@ -46,7 +50,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
|||
* Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test
|
||||
* via "Multi" commands) so classified as MediumTests
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
@RunWith(Parameterized.class)
|
||||
@Category(LargeTests.class)
|
||||
public class TestMultiLogThreshold {
|
||||
|
||||
@ClassRule
|
||||
|
@ -62,22 +67,30 @@ public class TestMultiLogThreshold {
|
|||
private static HRegionServer RS;
|
||||
private static int THRESHOLD;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
@Parameterized.Parameter
|
||||
public static boolean rejectLargeBatchOp;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static List<Object[]> params() {
|
||||
return Arrays.asList(new Object[] { false }, new Object[] { true });
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupTest() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("tableName");
|
||||
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
|
||||
CONF = TEST_UTIL.getConfiguration();
|
||||
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
|
||||
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
|
||||
CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
|
||||
TEST_UTIL.startMiniCluster();
|
||||
TEST_UTIL.createTable(tableName, TEST_FAM);
|
||||
RS = TEST_UTIL.getRSForFirstRegionInTable(tableName);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupTest() throws Exception {
|
||||
LD = Mockito.mock(RSRpcServices.LogDelegate.class);
|
||||
SERVICES = new RSRpcServices(RS, LD);
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private enum ActionType {
|
||||
|
@ -89,8 +102,9 @@ public class TestMultiLogThreshold {
|
|||
* "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of
|
||||
* Actions
|
||||
*/
|
||||
private void sendMultiRequest(int rows, ActionType actionType) throws ServiceException {
|
||||
RpcController rpcc = Mockito.mock(RpcController.class);
|
||||
private void sendMultiRequest(int rows, ActionType actionType)
|
||||
throws ServiceException, IOException {
|
||||
RpcController rpcc = Mockito.mock(HBaseRpcController.class);
|
||||
MultiRequest.Builder builder = MultiRequest.newBuilder();
|
||||
int numRAs = 1;
|
||||
int numAs = 1;
|
||||
|
@ -113,35 +127,38 @@ public class TestMultiLogThreshold {
|
|||
}
|
||||
builder.addRegionAction(rab.build());
|
||||
}
|
||||
try {
|
||||
SERVICES.multi(rpcc, builder.build());
|
||||
} catch (ClassCastException e) {
|
||||
// swallow expected exception due to mocked RpcController
|
||||
}
|
||||
LD = Mockito.mock(RSRpcServices.LogDelegate.class);
|
||||
SERVICES = new RSRpcServices(RS, LD);
|
||||
SERVICES.multi(rpcc, builder.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiLogThresholdRegionActions() throws ServiceException, IOException {
|
||||
sendMultiRequest(THRESHOLD + 1, ActionType.REGION_ACTIONS);
|
||||
verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
|
||||
}
|
||||
try {
|
||||
sendMultiRequest(THRESHOLD + 1, ActionType.REGION_ACTIONS);
|
||||
Assert.assertFalse(rejectLargeBatchOp);
|
||||
} catch (ServiceException e) {
|
||||
Assert.assertTrue(rejectLargeBatchOp);
|
||||
}
|
||||
Mockito.verify(LD, Mockito.times(1))
|
||||
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
|
||||
|
||||
@Test
|
||||
public void testMultiNoLogThresholdRegionActions() throws ServiceException, IOException {
|
||||
sendMultiRequest(THRESHOLD, ActionType.REGION_ACTIONS);
|
||||
verify(LD, Mockito.never()).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
|
||||
}
|
||||
Mockito.verify(LD, Mockito.never())
|
||||
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
|
||||
|
||||
@Test
|
||||
public void testMultiLogThresholdActions() throws ServiceException, IOException {
|
||||
sendMultiRequest(THRESHOLD + 1, ActionType.ACTIONS);
|
||||
verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
|
||||
}
|
||||
try {
|
||||
sendMultiRequest(THRESHOLD + 1, ActionType.ACTIONS);
|
||||
Assert.assertFalse(rejectLargeBatchOp);
|
||||
} catch (ServiceException e) {
|
||||
Assert.assertTrue(rejectLargeBatchOp);
|
||||
}
|
||||
Mockito.verify(LD, Mockito.times(1))
|
||||
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
|
||||
|
||||
@Test
|
||||
public void testMultiNoLogThresholdAction() throws ServiceException, IOException {
|
||||
sendMultiRequest(THRESHOLD, ActionType.ACTIONS);
|
||||
verify(LD, Mockito.never()).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
|
||||
Mockito.verify(LD, Mockito.never())
|
||||
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2245,3 +2245,23 @@ The percent of region server RPC threads failed to abort RS.
|
|||
+
|
||||
.Default
|
||||
`0`
|
||||
|
||||
|
||||
[[hbase.rpc.rows.size.threshold.reject]]
|
||||
*`hbase.rpc.rows.size.threshold.reject`*::
|
||||
+
|
||||
.Description
|
||||
|
||||
If value is true, RegionServer will abort batch requests of
|
||||
Put/Delete with number of rows in a batch operation exceeding
|
||||
threshold defined by value of config:
|
||||
hbase.rpc.rows.warning.threshold.
|
||||
The default value is false and hence, by default, only
|
||||
warning will be logged. This config should be turned on to
|
||||
prevent RegionServer from serving
|
||||
very large batch size of rows and this way we can improve
|
||||
CPU usages by discarding too large batch request.
|
||||
|
||||
+
|
||||
.Default
|
||||
`false`
|
||||
|
|
Loading…
Reference in New Issue