HBASE-17408 Introduce per request limit by number of mutations (ChiaPing Tsai)

This commit is contained in:
tedyu 2017-01-06 13:07:15 -08:00
parent 4c98f97c31
commit b2a9be02ac
3 changed files with 183 additions and 76 deletions

View File

@ -49,30 +49,39 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Holds back the request if the submitted size or number has reached the
* threshold.
* Holds back the requests if they reach any thresholds.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class SimpleRequestController implements RequestController {
private static final Log LOG = LogFactory.getLog(SimpleRequestController.class);
/**
* The maximum size of single RegionServer.
* The maximum heap size for each request.
*/
public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
/**
* Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
* Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
*/
@VisibleForTesting
static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
/**
* The maximum number of rows for each request.
*/
public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows";
/**
* Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}.
*/
@VisibleForTesting
static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048;
/**
* The maximum size of submit.
*/
public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
/**
* Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
* Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
*/
@VisibleForTesting
static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
@ -89,9 +98,13 @@ class SimpleRequestController implements RequestController {
private final int maxTotalConcurrentTasks;
/**
* The max heap size of all tasks simultaneously executed on a server.
* The maximum heap size for each request.
*/
private final long maxHeapSizePerRequest;
/**
* The maximum number of rows for each request.
*/
private final long maxRowsPerRequest;
private final long maxHeapSizeSubmit;
/**
* The number of tasks we run in parallel on a single region. With 1 (the
@ -116,41 +129,46 @@ class SimpleRequestController implements RequestController {
private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
private final int thresholdToLogRegionDetails;
SimpleRequestController(final Configuration conf) {
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
this.maxTotalConcurrentTasks = checkAndGet(conf,
HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
this.maxConcurrentTasksPerServer = checkAndGet(conf,
HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
this.maxConcurrentTasksPerRegion = checkAndGet(conf,
HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
this.maxHeapSizePerRequest = checkAndGet(conf,
HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
this.thresholdToLogUndoneTaskDetails =
conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
this.maxRowsPerRequest = checkAndGet(conf,
HBASE_CLIENT_MAX_PERREQUEST_ROWS,
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS);
this.maxHeapSizeSubmit = checkAndGet(conf,
HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE,
DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
this.thresholdToLogUndoneTaskDetails = conf.getInt(
THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
this.thresholdToLogRegionDetails =
conf.getInt(THRESHOLD_TO_LOG_REGION_DETAILS,
this.thresholdToLogRegionDetails = conf.getInt(
THRESHOLD_TO_LOG_REGION_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
if (this.maxTotalConcurrentTasks <= 0) {
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
}
if (this.maxConcurrentTasksPerServer <= 0) {
throw new IllegalArgumentException("maxConcurrentTasksPerServer="
+ maxConcurrentTasksPerServer);
}
if (this.maxConcurrentTasksPerRegion <= 0) {
throw new IllegalArgumentException("maxConcurrentTasksPerRegion="
+ maxConcurrentTasksPerRegion);
}
if (this.maxHeapSizePerRequest <= 0) {
throw new IllegalArgumentException("maxHeapSizePerServer="
+ maxHeapSizePerRequest);
}
}
if (this.maxHeapSizeSubmit <= 0) {
throw new IllegalArgumentException("maxHeapSizeSubmit="
+ maxHeapSizeSubmit);
private static int checkAndGet(Configuration conf, String key, int defaultValue) {
int value = conf.getInt(key, defaultValue);
if (value <= 0) {
throw new IllegalArgumentException(key + "=" + value);
}
return value;
}
private static long checkAndGet(Configuration conf, String key, long defaultValue) {
long value = conf.getLong(key, defaultValue);
if (value <= 0) {
throw new IllegalArgumentException(key + "=" + value);
}
return value;
}
@VisibleForTesting
@ -163,10 +181,10 @@ class SimpleRequestController implements RequestController {
if (isEnd) {
return ReturnCode.END;
}
long rowSize = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
ReturnCode code = ReturnCode.INCLUDE;
for (RowChecker checker : checkers) {
switch (checker.canTakeOperation(loc, rowSize)) {
switch (checker.canTakeOperation(loc, heapSizeOfRow)) {
case END:
isEnd = true;
code = ReturnCode.END;
@ -183,7 +201,7 @@ class SimpleRequestController implements RequestController {
}
}
for (RowChecker checker : checkers) {
checker.notifyFinal(code, loc, rowSize);
checker.notifyFinal(code, loc, heapSizeOfRow);
}
return code;
}
@ -208,15 +226,16 @@ class SimpleRequestController implements RequestController {
@Override
public Checker newChecker() {
List<RowChecker> checkers = new ArrayList<>(3);
List<RowChecker> checkers = new ArrayList<>(4);
checkers.add(new TaskCountChecker(maxTotalConcurrentTasks,
maxConcurrentTasksPerServer,
maxConcurrentTasksPerRegion,
tasksInProgress,
taskCounterPerServer,
taskCounterPerRegion));
checkers.add(new RequestSizeChecker(maxHeapSizePerRequest));
checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest));
checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
checkers.add(new RequestRowsChecker(maxRowsPerRequest));
return newChecker(checkers);
}
@ -323,7 +342,7 @@ class SimpleRequestController implements RequestController {
}
@Override
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
if (heapSize >= maxHeapSizeSubmit) {
return ReturnCode.END;
}
@ -331,9 +350,9 @@ class SimpleRequestController implements RequestController {
}
@Override
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
if (code == ReturnCode.INCLUDE) {
heapSize += rowSize;
heapSize += heapSizeOfRow;
}
}
@ -413,11 +432,11 @@ class SimpleRequestController implements RequestController {
* tasks for server.
*
* @param loc
* @param rowSize
* @param heapSizeOfRow
* @return
*/
@Override
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
HRegionInfo regionInfo = loc.getRegionInfo();
if (regionsIncluded.contains(regionInfo)) {
@ -444,7 +463,7 @@ class SimpleRequestController implements RequestController {
}
@Override
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
if (code == ReturnCode.INCLUDE) {
regionsIncluded.add(loc.getRegionInfo());
serversIncluded.add(loc.getServerName());
@ -454,15 +473,54 @@ class SimpleRequestController implements RequestController {
}
/**
* limit the request size for each regionserver.
* limit the number of rows for each request.
*/
@VisibleForTesting
static class RequestSizeChecker implements RowChecker {
static class RequestRowsChecker implements RowChecker {
private final long maxRowsPerRequest;
private final Map<ServerName, Long> serverRows = new HashMap<>();
RequestRowsChecker(final long maxRowsPerRequest) {
this.maxRowsPerRequest = maxRowsPerRequest;
}
@Override
public void reset() {
serverRows.clear();
}
@Override
public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
long currentRows = serverRows.containsKey(loc.getServerName())
? serverRows.get(loc.getServerName()) : 0L;
// accept at least one row
if (currentRows == 0 || currentRows < maxRowsPerRequest) {
return ReturnCode.INCLUDE;
}
return ReturnCode.SKIP;
}
@Override
public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
if (code == ReturnCode.INCLUDE) {
long currentRows = serverRows.containsKey(loc.getServerName())
? serverRows.get(loc.getServerName()) : 0L;
serverRows.put(loc.getServerName(), currentRows + 1);
}
}
}
/**
* limit the heap size for each request.
*/
@VisibleForTesting
static class RequestHeapSizeChecker implements RowChecker {
private final long maxHeapSizePerRequest;
private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
RequestSizeChecker(final long maxHeapSizePerRequest) {
RequestHeapSizeChecker(final long maxHeapSizePerRequest) {
this.maxHeapSizePerRequest = maxHeapSizePerRequest;
}
@ -472,23 +530,23 @@ class SimpleRequestController implements RequestController {
}
@Override
public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
// Is it ok for limit of request size?
long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
? serverRequestSizes.get(loc.getServerName()) : 0L;
// accept at least one request
if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) {
if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) {
return ReturnCode.INCLUDE;
}
return ReturnCode.SKIP;
}
@Override
public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
if (code == ReturnCode.INCLUDE) {
long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
? serverRequestSizes.get(loc.getServerName()) : 0L;
serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize);
serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow);
}
}
}
@ -499,7 +557,7 @@ class SimpleRequestController implements RequestController {
@VisibleForTesting
interface RowChecker {
ReturnCode canTakeOperation(HRegionLocation loc, long rowSize);
ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow);
/**
* Add the final ReturnCode to the checker. The ReturnCode may be reversed,
@ -507,9 +565,9 @@ class SimpleRequestController implements RequestController {
*
* @param code The final decision
* @param loc the destination of data
* @param rowSize the data size
* @param heapSizeOfRow the data size
*/
void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize);
void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow);
/**
* Reset the inner state.

View File

@ -56,7 +56,6 @@ public class TestSimpleRequestController {
private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
private static final ServerName SN = ServerName.valueOf("s1:1,1");
private static final ServerName SN2 = ServerName.valueOf("s2:2,2");
private static final ServerName SN3 = ServerName.valueOf("s3:3,3");
private static final HRegionInfo HRI1
= new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
private static final HRegionInfo HRI2
@ -68,7 +67,7 @@ public class TestSimpleRequestController {
private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
@Test
public void testIllegalRequestSize() {
public void testIllegalRequestHeapSize() {
testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
}
@ -87,9 +86,14 @@ public class TestSimpleRequestController {
testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
}
@Test
public void testIllegalRequestRows() {
testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1);
}
private void testIllegalArgument(String key, long value) {
Configuration conf = HBaseConfiguration.create();
conf.setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
conf.setLong(key, value);
try {
SimpleRequestController controller = new SimpleRequestController(conf);
fail("The " + key + " must be bigger than zero");
@ -121,7 +125,7 @@ public class TestSimpleRequestController {
tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
final long maxHeapSizePerRequest = 2 * 1024 * 1024;
// unlimiited
SimpleRequestController.RequestSizeChecker sizeChecker = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest);
SimpleRequestController.RequestHeapSizeChecker sizeChecker = new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
RequestController.Checker checker = SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker));
ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
assertEquals(ReturnCode.INCLUDE, loc1Code);
@ -151,10 +155,10 @@ public class TestSimpleRequestController {
}
@Test
public void testRequestSizeCheckerr() throws IOException {
public void testRequestHeapSizeChecker() throws IOException {
final long maxHeapSizePerRequest = 2 * 1024 * 1024;
SimpleRequestController.RequestSizeChecker checker
= new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest);
SimpleRequestController.RequestHeapSizeChecker checker
= new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
// inner state is unchanged.
for (int i = 0; i != 10; ++i) {
@ -192,6 +196,51 @@ public class TestSimpleRequestController {
}
}
@Test
public void testRequestRowsChecker() throws IOException {
final long maxRowCount = 100;
SimpleRequestController.RequestRowsChecker checker
= new SimpleRequestController.RequestRowsChecker(maxRowCount);
final long heapSizeOfRow = 100; //unused
// inner state is unchanged.
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, code);
code = checker.canTakeOperation(LOC2, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, code);
}
// accept the data located on LOC1 region.
for (int i = 0; i != maxRowCount; ++i) {
ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, acceptCode);
checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow);
}
// the sn server reachs the limit.
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
assertNotEquals(ReturnCode.INCLUDE, code);
code = checker.canTakeOperation(LOC2, heapSizeOfRow);
assertNotEquals(ReturnCode.INCLUDE, code);
}
// the request to sn2 server should be accepted.
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, code);
}
checker.reset();
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, code);
code = checker.canTakeOperation(LOC2, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, code);
}
}
@Test
public void testSubmittedSizeChecker() {
final long maxHeapSizeSubmit = 2 * 1024 * 1024;
@ -224,7 +273,7 @@ public class TestSimpleRequestController {
@Test
public void testTaskCountChecker() throws InterruptedIOException {
long rowSize = 12345;
long heapSizeOfRow = 12345;
int maxTotalConcurrentTasks = 100;
int maxConcurrentTasksPerServer = 2;
int maxConcurrentTasksPerRegion = 1;
@ -239,13 +288,13 @@ public class TestSimpleRequestController {
// inner state is unchanged.
for (int i = 0; i != 10; ++i) {
ReturnCode code = checker.canTakeOperation(LOC1, rowSize);
ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, code);
}
// add LOC1 region.
ReturnCode code = checker.canTakeOperation(LOC1, rowSize);
ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, code);
checker.notifyFinal(code, LOC1, rowSize);
checker.notifyFinal(code, LOC1, heapSizeOfRow);
// fill the task slots for LOC1.
taskCounterPerRegion.put(LOC1.getRegionInfo().getRegionName(), new AtomicInteger(100));
@ -253,9 +302,9 @@ public class TestSimpleRequestController {
// the region was previously accepted, so it must be accpted now.
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize);
ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, includeCode);
checker.notifyFinal(includeCode, LOC1, rowSize);
checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
}
// fill the task slots for LOC3.
@ -264,9 +313,9 @@ public class TestSimpleRequestController {
// no task slots.
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
ReturnCode excludeCode = checker.canTakeOperation(LOC3, rowSize);
ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
assertNotEquals(ReturnCode.INCLUDE, excludeCode);
checker.notifyFinal(excludeCode, LOC3, rowSize);
checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow);
}
// release the tasks for LOC3.
@ -274,15 +323,15 @@ public class TestSimpleRequestController {
taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
// add LOC3 region.
ReturnCode code3 = checker.canTakeOperation(LOC3, rowSize);
ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, code3);
checker.notifyFinal(code3, LOC3, rowSize);
checker.notifyFinal(code3, LOC3, heapSizeOfRow);
// the region was previously accepted, so it must be accpted now.
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
ReturnCode includeCode = checker.canTakeOperation(LOC3, rowSize);
ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
assertEquals(ReturnCode.INCLUDE, includeCode);
checker.notifyFinal(includeCode, LOC3, rowSize);
checker.notifyFinal(includeCode, LOC3, heapSizeOfRow);
}
checker.reset();
@ -290,9 +339,9 @@ public class TestSimpleRequestController {
// but checker have reseted and task slots for LOC1 is full.
// So it must be rejected now.
for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize);
ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
assertNotEquals(ReturnCode.INCLUDE, includeCode);
checker.notifyFinal(includeCode, LOC1, rowSize);
checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
}
}

View File

@ -41,7 +41,7 @@ public interface RegionServerObserver extends Coprocessor {
throws IOException;
/**
* Called before the regions merge.
* Called before the regions merge.
* Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} to skip the merge.
* @throws IOException if an error occurred on the coprocessor
* @param ctx