From 5ad233b60c54ea7b3b5bccdd796d44dc15cb477c Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Wed, 22 Apr 2020 12:46:40 +0530 Subject: [PATCH] HBASE-24024 : Reject multi() requests with rows higher than threshold (#1560) Signed-off-by: Reid Chan --- .../src/main/resources/hbase-default.xml | 12 +++ .../hbase/regionserver/RSRpcServices.java | 27 +++++- .../regionserver/TestMultiLogThreshold.java | 85 +++++++++++-------- .../asciidoc/_chapters/hbase-default.adoc | 20 +++++ 4 files changed, 109 insertions(+), 35 deletions(-) diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 56fa7678c22..128f3f6275f 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1960,4 +1960,16 @@ possible configurations would overwhelm and obscure the important. responses with complete data. + + hbase.rpc.rows.size.threshold.reject + false + + If value is true, RegionServer will abort batch requests of Put/Delete with number of rows + in a batch operation exceeding threshold defined by value of config: + hbase.rpc.rows.warning.threshold. The default value is false and hence, by default, only + warning will be logged. This config should be turned on to prevent RegionServer from serving + very large batch size of rows and this way we can improve CPU usages by discarding + too large batch request. + + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index bc627575a8c..a104f495e90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -296,6 +296,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; + /* + * Whether to reject rows with size > threshold defined by + * {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} + */ + private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = + "hbase.rpc.rows.size.threshold.reject"; + + /* + * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} + */ + private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; + // Request counter. (Includes requests that are not serviced by regions.) // Count only once for requests with multiple actions like multi/caching-scan/replayBatch final LongAdder requestCount = new LongAdder(); @@ -348,6 +360,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * Row size threshold for multi requests above which a warning is logged */ private final int rowSizeWarnThreshold; + /* + * Whether we should reject requests with very high no of rows i.e. beyond threshold + * defined by rowSizeWarnThreshold + */ + private final boolean rejectRowsWithSizeOverThreshold; final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); @@ -1221,6 +1238,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, this.ld = ld; regionServer = rs; rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); + rejectRowsWithSizeOverThreshold = + conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); final RpcSchedulerFactory rpcSchedulerFactory; try { @@ -2655,7 +2674,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } - private void checkBatchSizeAndLogLargeSize(MultiRequest request) { + private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { int sum = 0; String firstRegionName = null; for (RegionAction regionAction : request.getRegionActionList()) { @@ -2666,6 +2685,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } if (sum > rowSizeWarnThreshold) { ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold); + if (rejectRowsWithSizeOverThreshold) { + throw new ServiceException( + "Rejecting large batch operation for current batch with firstRegionName: " + + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " + + rowSizeWarnThreshold); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java index 8e11ed58d20..abc9564920b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,20 +17,24 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.mockito.Mockito.verify; - import java.io.IOException; +import java.util.Arrays; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; @@ -46,7 +50,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; * Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test * via "Multi" commands) so classified as MediumTests */ -@Category(MediumTests.class) +@RunWith(Parameterized.class) +@Category(LargeTests.class) public class TestMultiLogThreshold { @ClassRule @@ -62,22 +67,30 @@ public class TestMultiLogThreshold { private static HRegionServer RS; private static int THRESHOLD; - @BeforeClass - public static void setup() throws Exception { + @Parameterized.Parameter + public static boolean rejectLargeBatchOp; + + @Parameterized.Parameters + public static List params() { + return Arrays.asList(new Object[] { false }, new Object[] { true }); + } + + @Before + public void setupTest() throws Exception { final TableName tableName = TableName.valueOf("tableName"); TEST_UTIL = HBaseTestingUtility.createLocalHTU(); CONF = TEST_UTIL.getConfiguration(); THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME, RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT); + CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp); TEST_UTIL.startMiniCluster(); TEST_UTIL.createTable(tableName, TEST_FAM); RS = TEST_UTIL.getRSForFirstRegionInTable(tableName); } - @Before - public void setupTest() throws Exception { - LD = Mockito.mock(RSRpcServices.LogDelegate.class); - SERVICES = new RSRpcServices(RS, LD); + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); } private enum ActionType { @@ -89,8 +102,9 @@ public class TestMultiLogThreshold { * "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of * Actions */ - private void sendMultiRequest(int rows, ActionType actionType) throws ServiceException { - RpcController rpcc = Mockito.mock(RpcController.class); + private void sendMultiRequest(int rows, ActionType actionType) + throws ServiceException, IOException { + RpcController rpcc = Mockito.mock(HBaseRpcController.class); MultiRequest.Builder builder = MultiRequest.newBuilder(); int numRAs = 1; int numAs = 1; @@ -113,35 +127,38 @@ public class TestMultiLogThreshold { } builder.addRegionAction(rab.build()); } - try { - SERVICES.multi(rpcc, builder.build()); - } catch (ClassCastException e) { - // swallow expected exception due to mocked RpcController - } + LD = Mockito.mock(RSRpcServices.LogDelegate.class); + SERVICES = new RSRpcServices(RS, LD); + SERVICES.multi(rpcc, builder.build()); } @Test public void testMultiLogThresholdRegionActions() throws ServiceException, IOException { - sendMultiRequest(THRESHOLD + 1, ActionType.REGION_ACTIONS); - verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); - } + try { + sendMultiRequest(THRESHOLD + 1, ActionType.REGION_ACTIONS); + Assert.assertFalse(rejectLargeBatchOp); + } catch (ServiceException e) { + Assert.assertTrue(rejectLargeBatchOp); + } + Mockito.verify(LD, Mockito.times(1)) + .logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); - @Test - public void testMultiNoLogThresholdRegionActions() throws ServiceException, IOException { sendMultiRequest(THRESHOLD, ActionType.REGION_ACTIONS); - verify(LD, Mockito.never()).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); - } + Mockito.verify(LD, Mockito.never()) + .logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); - @Test - public void testMultiLogThresholdActions() throws ServiceException, IOException { - sendMultiRequest(THRESHOLD + 1, ActionType.ACTIONS); - verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); - } + try { + sendMultiRequest(THRESHOLD + 1, ActionType.ACTIONS); + Assert.assertFalse(rejectLargeBatchOp); + } catch (ServiceException e) { + Assert.assertTrue(rejectLargeBatchOp); + } + Mockito.verify(LD, Mockito.times(1)) + .logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); - @Test - public void testMultiNoLogThresholdAction() throws ServiceException, IOException { sendMultiRequest(THRESHOLD, ActionType.ACTIONS); - verify(LD, Mockito.never()).logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); + Mockito.verify(LD, Mockito.never()) + .logBatchWarning(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt()); } } diff --git a/src/main/asciidoc/_chapters/hbase-default.adoc b/src/main/asciidoc/_chapters/hbase-default.adoc index b0cd4bff4c2..e232ab22900 100644 --- a/src/main/asciidoc/_chapters/hbase-default.adoc +++ b/src/main/asciidoc/_chapters/hbase-default.adoc @@ -2245,3 +2245,23 @@ The percent of region server RPC threads failed to abort RS. + .Default `0` + + +[[hbase.rpc.rows.size.threshold.reject]] +*`hbase.rpc.rows.size.threshold.reject`*:: ++ +.Description + + If value is true, RegionServer will abort batch requests of + Put/Delete with number of rows in a batch operation exceeding + threshold defined by value of config: + hbase.rpc.rows.warning.threshold. + The default value is false and hence, by default, only + warning will be logged. This config should be turned on to + prevent RegionServer from serving + very large batch size of rows and this way we can improve + CPU usages by discarding too large batch request. + ++ +.Default +`false`