HBASE-14978 Don't allow Multi to retain too many blocks

This commit is contained in:
Elliott Clark 2015-12-14 18:16:03 -08:00
parent bbfff0d072
commit 217036d816
4 changed files with 137 additions and 23 deletions

View File

@ -90,4 +90,7 @@ public interface RpcCallContext extends Delayable {
* onerous.
*/
void incrementResponseCellSize(long cellSize);
long getResponseBlockSize();
void incrementResponseBlockSize(long blockSize);
}

View File

@ -319,6 +319,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private RpcCallback callback;
private long responseCellSize = 0;
private long responseBlockSize = 0;
private boolean retryImmediatelySupported;
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
@ -551,6 +552,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
responseCellSize += cellSize;
}
@Override
public long getResponseBlockSize() {
return responseBlockSize;
}
@Override
public void incrementResponseBlockSize(long blockSize) {
responseBlockSize += blockSize;
}
/**
* If we have a response, and delay is not set, then respond
* immediately. Otherwise, do not respond to client. This is

View File

@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ByteBufferedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
@ -658,6 +660,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
IOException sizeIOE = null;
Object lastBlock = null;
for (ClientProtos.Action action : actions.getActionList()) {
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
try {
@ -665,7 +668,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (context != null
&& context.isRetryImmediatelySupported()
&& context.getResponseCellSize() > maxQuotaResultSize) {
&& (context.getResponseCellSize() > maxQuotaResultSize
|| context.getResponseBlockSize() > maxQuotaResultSize)) {
// We're storing the exception since the exception and reason string won't
// change after the response size limit is reached.
@ -674,15 +678,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Throwing will kill the JVM's JIT.
//
// Instead just create the exception and then store it.
sizeIOE = new MultiActionResultTooLarge("Max response size exceeded: "
+ context.getResponseCellSize());
sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
+ " CellSize: " + context.getResponseCellSize()
+ " BlockSize: " + context.getResponseBlockSize());
// Only report the exception once since there's only one request that
// caused the exception. Otherwise this number will dominate the exceptions count.
rpcServer.getMetrics().exception(sizeIOE);
}
// Now that there's an exception is know to be created
// Now that there's an exception is known to be created
// use it for the response.
//
// This will create a copy in the builder.
@ -755,9 +760,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else {
pbResult = ProtobufUtil.toResult(r);
}
if (context != null) {
context.incrementResponseCellSize(Result.getTotalSizeOfCells(r));
}
lastBlock = addSize(context, r, lastBlock);
resultOrExceptionBuilder =
ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
}
@ -1070,6 +1073,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return 0L;
}
/**
* Method to account for the size of retained cells and retained data blocks.
* @return an object that represents the last referenced block from this response.
*/
Object addSize(RpcCallContext context, Result r, Object lastBlock) {
if (context != null && !r.isEmpty()) {
for (Cell c : r.rawCells()) {
context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
// Since byte buffers can point all kinds of crazy places it's harder to keep track
// of which blocks are kept alive by what byte buffer.
// So we make a guess.
if (c instanceof ByteBufferedCell) {
ByteBufferedCell bbCell = (ByteBufferedCell) c;
ByteBuffer bb = bbCell.getValueByteBuffer();
if (bb != lastBlock) {
context.incrementResponseBlockSize(bb.capacity());
lastBlock = bb;
}
} else {
// We're using the last block being the same as the current block as
// a proxy for pointing to a new block. This won't be exact.
// If there are multiple gets that bounce back and forth
// Then it's possible that this will over count the size of
// referenced blocks. However it's better to over count and
// use two rpcs than to OOME the regionserver.
byte[] valueArray = c.getValueArray();
if (valueArray != lastBlock) {
context.incrementResponseBlockSize(valueArray.length);
lastBlock = valueArray;
}
}
}
}
return lastBlock;
}
RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
throws LeaseStillHeldException {
Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
@ -2467,6 +2508,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
assert scanner != null;
RpcCallContext context = RpcServer.getCurrentCall();
Object lastBlock = null;
quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
@ -2500,11 +2542,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
scanner, results, rows);
if (!results.isEmpty()) {
for (Result r : results) {
for (Cell cell : r.rawCells()) {
if (context != null) {
context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
}
}
lastBlock = addSize(context, r, lastBlock);
}
}
if (bypass != null && bypass.booleanValue()) {
@ -2601,13 +2639,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
moreRows = scanner.nextRaw(values, scannerContext);
if (!values.isEmpty()) {
for (Cell cell : values) {
if (context != null) {
context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
}
}
final boolean partial = scannerContext.partialResultFormed();
results.add(Result.create(values, null, stale, partial));
Result r = Result.create(values, null, stale, partial);
lastBlock = addSize(context, r, lastBlock);
results.add(r);
i++;
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -36,6 +37,7 @@ import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import static junit.framework.TestCase.assertEquals;
@ -73,7 +75,7 @@ public class TestMultiRespectsLimits {
TEST_UTIL.loadTable(t, FAMILY, false);
// Split the table to make sure that the chunking happens accross regions.
try (final Admin admin = TEST_UTIL.getHBaseAdmin()) {
try (final Admin admin = TEST_UTIL.getAdmin()) {
admin.split(name);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
@ -87,16 +89,79 @@ public class TestMultiRespectsLimits {
for (int i = 0; i < MAX_SIZE; i++) {
gets.add(new Get(HBaseTestingUtility.ROWS[i]));
}
Result[] results = t.get(gets);
assertEquals(MAX_SIZE, results.length);
RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer();
BaseSource s = rpcServer.getMetrics().getMetricsSource();
long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s);
Result[] results = t.get(gets);
assertEquals(MAX_SIZE, results.length);
// Cells from TEST_UTIL.loadTable have a length of 27.
// Multiplying by less than that gives an easy lower bound on size.
// However in reality each kv is being reported as much higher than that.
METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, s);
METRICS_ASSERT.assertCounterGt("exceptions",
startingExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
(MAX_SIZE * 25) / MAX_SIZE, s);
startingMultiExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
}
@Test
public void testBlockMultiLimits() throws Exception {
final TableName name = TableName.valueOf("testBlockMultiLimits");
Table t = TEST_UTIL.createTable(name, FAMILY);
final HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
RpcServerInterface rpcServer = regionServer.getRpcServer();
BaseSource s = rpcServer.getMetrics().getMetricsSource();
long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s);
byte[] row = Bytes.toBytes("TEST");
byte[][] cols = new byte[][]{
Bytes.toBytes("0"), // Get this
Bytes.toBytes("1"), // Buffer
Bytes.toBytes("2"), // Get This
Bytes.toBytes("3"), // Buffer
};
// Set the value size so that one result will be less than the MAX_SIE
// however the block being reference will be larger than MAX_SIZE.
// This should cause the regionserver to try and send a result immediately.
byte[] value = new byte[MAX_SIZE - 200];
ThreadLocalRandom.current().nextBytes(value);
for (byte[] col:cols) {
Put p = new Put(row);
p.addImmutable(FAMILY, col, value);
t.put(p);
}
// Make sure that a flush happens
try (final Admin admin = TEST_UTIL.getAdmin()) {
admin.flush(name);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return regionServer.getOnlineRegions(name).get(0).getMaxFlushedSeqId() > 3;
}
});
}
List<Get> gets = new ArrayList<>(2);
Get g0 = new Get(row);
g0.addColumn(FAMILY, cols[0]);
gets.add(g0);
Get g2 = new Get(row);
g2.addColumn(FAMILY, cols[2]);
gets.add(g2);
Result[] results = t.get(gets);
assertEquals(2, results.length);
METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
startingMultiExceptions, s);
}
}