HBASE-18023 Log multi-* requests for more than threshold number of rows

Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
David Harju 2017-06-22 13:29:34 -07:00 committed by Josh Elser
parent a4c26526c0
commit d9dd319390
3 changed files with 200 additions and 2 deletions

View File

@ -1767,4 +1767,11 @@ possible configurations would overwhelm and obscure the important.
Timeout for regionservers to keep threads in snapshot request pool waiting
</description>
</property>
<property>
<name>hbase.rpc.rows.warning.threshold</name>
<value>1000</value>
<description>
Number of rows in a batch operation above which a warning will be logged.
</description>
</property>
</configuration>

View File

@ -254,6 +254,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
/**
* Number of rows in a batch operation above which a warning will be logged.
*/
static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
/**
* Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 1000;
// Request counter. (Includes requests that are not serviced by regions.)
final LongAdder requestCount = new LongAdder();
@ -300,6 +309,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private final long minimumScanTimeLimitDelta;
/**
* Row size threshold for multi requests above which a warning is logged
*/
private final int rowSizeWarnThreshold;
final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
/**
@ -1116,9 +1130,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
public RSRpcServices(HRegionServer rs) throws IOException {
regionServer = rs;
// Exposed for testing
static interface LogDelegate {
void logBatchWarning(int sum, int rowSizeWarnThreshold);
}
private static LogDelegate DEFAULT_LOG_DELEGATE = new LogDelegate() {
@Override
public void logBatchWarning(int sum, int rowSizeWarnThreshold) {
if (LOG.isWarnEnabled()) {
LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
+ ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
+ RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress());
}
}
};
private final LogDelegate ld;
public RSRpcServices(HRegionServer rs) throws IOException {
this(rs, DEFAULT_LOG_DELEGATE);
}
// Directly invoked only for testing
RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException {
this.ld = ld;
regionServer = rs;
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
RpcSchedulerFactory rpcSchedulerFactory;
try {
Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
@ -2492,6 +2530,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
}
private void checkBatchSizeAndLogLargeSize(MultiRequest request) {
int sum = 0;
for (RegionAction regionAction : request.getRegionActionList()) {
sum += regionAction.getActionCount();
}
if (sum > rowSizeWarnThreshold) {
ld.logBatchWarning(sum, rowSizeWarnThreshold);
}
}
/**
* Execute multiple actions on a table: get, mutate, and/or execCoprocessor
*
@ -2508,6 +2556,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new ServiceException(ie);
}
checkBatchSizeAndLogLargeSize(request);
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data.
HBaseRpcController controller = (HBaseRpcController)rpcc;

View File

@ -0,0 +1,141 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
/**
* Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test
* via "Multi" commands) so classified as MediumTests
*/
@Category(MediumTests.class)
public class TestMultiLogThreshold {
private static RSRpcServices SERVICES;
private static HBaseTestingUtility TEST_UTIL;
private static Configuration CONF;
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
private static RSRpcServices.LogDelegate LD;
private static HRegionServer RS;
private static int THRESHOLD;
@BeforeClass
public static void setup() throws Exception {
final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM);
RS = TEST_UTIL.getRSForFirstRegionInTable(tableName);
}
@Before
public void setupTest() throws Exception {
LD = Mockito.mock(RSRpcServices.LogDelegate.class);
SERVICES = new RSRpcServices(RS, LD);
}
private enum ActionType {
REGION_ACTIONS, ACTIONS;
}
/**
* Sends a multi request with a certain amount of rows, will populate Multi command with either
* "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of
* Actions
*/
private void sendMultiRequest(int rows, ActionType actionType) throws ServiceException {
RpcController rpcc = Mockito.mock(RpcController.class);
MultiRequest.Builder builder = MultiRequest.newBuilder();
int numRAs = 1;
int numAs = 1;
switch (actionType) {
case REGION_ACTIONS:
numRAs = rows;
break;
case ACTIONS:
numAs = rows;
break;
}
for (int i = 0; i < numRAs; i++) {
RegionAction.Builder rab = RegionAction.newBuilder();
rab.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
new String("someStuff" + i).getBytes()));
for (int j = 0; j < numAs; j++) {
Action.Builder ab = Action.newBuilder();
rab.addAction(ab.build());
}
builder.addRegionAction(rab.build());
}
try {
SERVICES.multi(rpcc, builder.build());
} catch (ClassCastException e) {
// swallow expected exception due to mocked RpcController
}
}
@Test
public void testMultiLogThresholdRegionActions() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD + 1, ActionType.REGION_ACTIONS);
verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyInt(), Mockito.anyInt());
}
@Test
public void testMultiNoLogThresholdRegionActions() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD, ActionType.REGION_ACTIONS);
verify(LD, Mockito.never()).logBatchWarning(Mockito.anyInt(), Mockito.anyInt());
}
@Test
public void testMultiLogThresholdActions() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD + 1, ActionType.ACTIONS);
verify(LD, Mockito.times(1)).logBatchWarning(Mockito.anyInt(), Mockito.anyInt());
}
@Test
public void testMultiNoLogThresholdAction() throws ServiceException, IOException {
sendMultiRequest(THRESHOLD, ActionType.ACTIONS);
verify(LD, Mockito.never()).logBatchWarning(Mockito.anyInt(), Mockito.anyInt());
}
}