HBASE-24024 : Reject multi() requests with rows higher than threshold (#1560)

Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
Viraj Jasani 2020-04-22 12:46:40 +05:30
parent 5213d20ae8
commit a7a4f61040
No known key found for this signature in database
GPG Key ID: E906DFF511D3E5DB
4 changed files with 114 additions and 40 deletions

View File

@ -1678,4 +1678,16 @@ possible configurations would overwhelm and obscure the important.
enable this feature. enable this feature.
</description> </description>
</property> </property>
<property>
<name>hbase.rpc.rows.size.threshold.reject</name>
<value>false</value>
<description>
If value is true, RegionServer will abort batch requests of Put/Delete with number of rows
in a batch operation exceeding threshold defined by value of config:
hbase.rpc.rows.warning.threshold. The default value is false and hence, by default, only
warning will be logged. This config should be turned on to prevent RegionServer from serving
very large batch size of rows and this way we can improve CPU usages by discarding
too large batch request.
</description>
</property>
</configuration> </configuration>

View File

@ -240,6 +240,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/ */
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
/*
* Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
"hbase.rpc.rows.size.threshold.reject";
/*
* Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
*/
private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
// Request counter. (Includes requests that are not serviced by regions.)
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
final Counter requestCount = new Counter(); final Counter requestCount = new Counter();
@ -290,6 +303,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* Row size threshold for multi requests above which a warning is logged * Row size threshold for multi requests above which a warning is logged
*/ */
private final int rowSizeWarnThreshold; private final int rowSizeWarnThreshold;
/*
* Whether we should reject requests with very high no of rows i.e. beyond threshold
* defined by rowSizeWarnThreshold
*/
private final boolean rejectRowsWithSizeOverThreshold;
// We want to vet all accesses at the point of entry itself; limiting scope of access checker // We want to vet all accesses at the point of entry itself; limiting scope of access checker
// instance to only this class to prevent its use from spreading deeper into implementation. // instance to only this class to prevent its use from spreading deeper into implementation.
@ -1112,6 +1130,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionServer = rs; regionServer = rs;
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
RpcSchedulerFactory rpcSchedulerFactory; RpcSchedulerFactory rpcSchedulerFactory;
rejectRowsWithSizeOverThreshold = rs.conf
.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
try { try {
Class<?> rpcSchedulerFactoryClass = rs.conf.getClass( Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
@ -2371,7 +2392,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} }
private void checkBatchSizeAndLogLargeSize(MultiRequest request) { private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
int sum = 0; int sum = 0;
String firstRegionName = null; String firstRegionName = null;
for (RegionAction regionAction : request.getRegionActionList()) { for (RegionAction regionAction : request.getRegionActionList()) {
@ -2382,6 +2403,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
if (sum > rowSizeWarnThreshold) { if (sum > rowSizeWarnThreshold) {
ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold); ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold);
if (rejectRowsWithSizeOverThreshold) {
throw new ServiceException(
"Rejecting large batch operation for current batch with firstRegionName: "
+ firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
+ rowSizeWarnThreshold);
}
} }
} }

View File

@ -1,5 +1,4 @@
/** /*
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,35 +17,40 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.mockito.Mockito.verify;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
/** /**
* Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test * Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test
* via "Multi" commands) so classified as MediumTests * via "Multi" commands) so classified as MediumTests
*/ */
@Category(MediumTests.class) @RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMultiLogThreshold { public class TestMultiLogThreshold {
private static RSRpcServices SERVICES; private static RSRpcServices SERVICES;
@ -58,22 +62,30 @@ public class TestMultiLogThreshold {
private static HRegionServer RS; private static HRegionServer RS;
private static int THRESHOLD; private static int THRESHOLD;
@BeforeClass @Parameterized.Parameter
public static void setup() throws Exception { public static boolean rejectLargeBatchOp;
@Parameterized.Parameters
public static List<Object[]> params() {
return Arrays.asList(new Object[] { false }, new Object[] { true });
}
@Before
public void setupTest() throws Exception {
final TableName tableName = TableName.valueOf("tableName"); final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = HBaseTestingUtility.createLocalHTU(); TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration(); CONF = TEST_UTIL.getConfiguration();
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME, THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT); RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
TEST_UTIL.startMiniCluster(); TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM); TEST_UTIL.createTable(tableName, TEST_FAM);
RS = TEST_UTIL.getRSForFirstRegionInTable(tableName); RS = TEST_UTIL.getRSForFirstRegionInTable(tableName);
} }
@Before @After
public void setupTest() throws Exception { public void tearDown() throws Exception {
LD = Mockito.mock(RSRpcServices.LogDelegate.class); TEST_UTIL.shutdownMiniCluster();
SERVICES = new RSRpcServices(RS, LD);
} }
private enum ActionType { private enum ActionType {
@ -85,8 +97,9 @@ public class TestMultiLogThreshold {
* "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of * "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of
* Actions * Actions
*/ */
private void sendMultiRequest(int rows, ActionType actionType) throws ServiceException { private void sendMultiRequest(int rows, ActionType actionType)
RpcController rpcc = Mockito.mock(RpcController.class); throws ServiceException, IOException {
RpcController rpcc = Mockito.mock(HBaseRpcController.class);
MultiRequest.Builder builder = MultiRequest.newBuilder(); MultiRequest.Builder builder = MultiRequest.newBuilder();
int numRAs = 1; int numRAs = 1;
int numAs = 1; int numAs = 1;
@ -109,35 +122,38 @@ public class TestMultiLogThreshold {
} }
builder.addRegionAction(rab.build()); builder.addRegionAction(rab.build());
} }
try { LD = Mockito.mock(RSRpcServices.LogDelegate.class);
SERVICES.multi(rpcc, builder.build()); SERVICES = new RSRpcServices(RS, LD);
} catch (ClassCastException e) { SERVICES.multi(rpcc, builder.build());
// swallow expected exception due to mocked RpcController
}
} }
@Test @Test
public void testMultiLogThresholdRegionActions() throws ServiceException, IOException { public void testMultiLogThresholdRegionActions() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD + 1, ActionType.REGION_ACTIONS); try {
verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); sendMultiRequest(THRESHOLD + 1, ActionType.REGION_ACTIONS);
} Assert.assertFalse(rejectLargeBatchOp);
} catch (ServiceException e) {
Assert.assertTrue(rejectLargeBatchOp);
}
Mockito.verify(LD, Mockito.times(1))
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
@Test
public void testMultiNoLogThresholdRegionActions() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD, ActionType.REGION_ACTIONS); sendMultiRequest(THRESHOLD, ActionType.REGION_ACTIONS);
verify(LD, Mockito.never()).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); Mockito.verify(LD, Mockito.never())
} .logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
@Test try {
public void testMultiLogThresholdActions() throws ServiceException, IOException { sendMultiRequest(THRESHOLD + 1, ActionType.ACTIONS);
sendMultiRequest(THRESHOLD + 1, ActionType.ACTIONS); Assert.assertFalse(rejectLargeBatchOp);
verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); } catch (ServiceException e) {
} Assert.assertTrue(rejectLargeBatchOp);
}
Mockito.verify(LD, Mockito.times(1))
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
@Test
public void testMultiNoLogThresholdAction() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD, ActionType.ACTIONS); sendMultiRequest(THRESHOLD, ActionType.ACTIONS);
verify(LD, Mockito.never()).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); Mockito.verify(LD, Mockito.never())
.logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());
} }
} }

View File

@ -2254,3 +2254,22 @@ The percent of region server RPC threads failed to abort RS.
.Default .Default
`-1` `-1`
[[hbase.rpc.rows.size.threshold.reject]]
*`hbase.rpc.rows.size.threshold.reject`*::
+
.Description
If value is true, RegionServer will abort batch requests of
Put/Delete with number of rows in a batch operation exceeding
threshold defined by value of config:
hbase.rpc.rows.warning.threshold.
The default value is false and hence, by default, only
warning will be logged. This config should be turned on to
prevent RegionServer from serving
very large batch size of rows and this way we can improve
CPU usages by discarding too large batch request.
+
.Default
`false`