diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 3e38dbf24b3..eb76748e7f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -82,4 +82,7 @@ public interface RpcCallContext extends Delayable { * onerous. */ void incrementResponseCellSize(long cellSize); + + long getResponseBlockSize(); + void incrementResponseBlockSize(long blockSize); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 1a79e2ecd2c..0011c2e4989 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -318,6 +318,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private InetAddress remoteAddress; private long responseCellSize = 0; + private long responseBlockSize = 0; private boolean retryImmediatelySupported; Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, @@ -541,6 +542,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { responseCellSize += cellSize; } + @Override + public long getResponseBlockSize() { + return responseBlockSize; + } + + @Override + public void incrementResponseBlockSize(long blockSize) { + responseBlockSize += blockSize; + } + /** * If we have a response, and delay is not set, then respond * immediately. Otherwise, do not respond to client. This is diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 8f85f512d3d..f3b4b168f74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -581,6 +581,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); RpcCallContext context = RpcServer.getCurrentCall(); IOException sizeIOE = null; + Object lastBlock = null; for (ClientProtos.Action action : actions.getActionList()) { ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; try { @@ -588,7 +589,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (context != null && context.isRetryImmediatelySupported() - && context.getResponseCellSize() > maxQuotaResultSize) { + && (context.getResponseCellSize() > maxQuotaResultSize + || context.getResponseBlockSize() > maxQuotaResultSize)) { // We're storing the exception since the exception and reason string won't // change after the response size limit is reached. @@ -597,15 +599,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Throwing will kill the JVM's JIT. // // Instead just create the exception and then store it. - sizeIOE = new MultiActionResultTooLarge("Max response size exceeded: " - + context.getResponseCellSize()); + sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + + " CellSize: " + context.getResponseCellSize() + + " BlockSize: " + context.getResponseBlockSize()); // Only report the exception once since there's only one request that // caused the exception. Otherwise this number will dominate the exceptions count. rpcServer.getMetrics().exception(sizeIOE); } - // Now that there's an exception is know to be created + // Now that there's an exception is known to be created // use it for the response. // // This will create a copy in the builder. @@ -674,9 +677,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else { pbResult = ProtobufUtil.toResult(r); } - if (context != null) { - context.incrementResponseCellSize(Result.getTotalSizeOfCells(r)); - } + lastBlock = addSize(context, r, lastBlock); resultOrExceptionBuilder = ClientProtos.ResultOrException.newBuilder().setResult(pbResult); } @@ -1002,6 +1003,32 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return scannerId; } + /** + * Method to account for the size of retained cells and retained data blocks. + * @return an object that represents the last referenced block from this response. + */ + Object addSize(RpcCallContext context, Result r, Object lastBlock) { + if (context != null && !r.isEmpty()) { + for (Cell c : r.rawCells()) { + context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c)); + + // We're using the last block being the same as the current block as + // a proxy for pointing to a new block. This won't be exact. + // If there are multiple gets that bounce back and forth + // Then it's possible that this will over count the size of + // referenced blocks. However it's better to over count and + // use two rpcs than to OOME the regionserver. + byte[] rowArray = c.getRowArray(); + if (rowArray != lastBlock) { + context.incrementResponseBlockSize(rowArray.length); + lastBlock = rowArray; + } + } + } + return lastBlock; + } + + /** * Find the HRegion based on a region specifier * @@ -2291,6 +2318,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean closeScanner = false; boolean isSmallScan = false; RpcCallContext context = RpcServer.getCurrentCall(); + Object lastBlock = null; + ScanResponse.Builder builder = ScanResponse.newBuilder(); if (request.hasCloseScanner()) { closeScanner = request.getCloseScanner(); @@ -2379,11 +2408,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, scanner, results, rows); if (!results.isEmpty()) { for (Result r : results) { - for (Cell cell : r.rawCells()) { - if (context != null) { - context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell)); - } - } + lastBlock = addSize(context, r, lastBlock); } } if (bypass != null && bypass.booleanValue()) { @@ -2481,13 +2506,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, moreRows = scanner.nextRaw(values, scannerContext); if (!values.isEmpty()) { - for (Cell cell : values) { - if (context != null) { - context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell)); - } - } final boolean partial = scannerContext.partialResultFormed(); - results.add(Result.create(values, null, stale, partial)); + Result r = Result.create(values, null, stale, partial); + lastBlock = addSize(context, r, lastBlock); + results.add(r); i++; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java index 47dd7beccfd..28e1855b70d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.metrics.BaseSource; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -36,6 +37,7 @@ import org.junit.experimental.categories.Category; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import static junit.framework.TestCase.assertEquals; @@ -73,7 +75,7 @@ public class TestMultiRespectsLimits { TEST_UTIL.loadTable(t, FAMILY, false); // Split the table to make sure that the chunking happens accross regions. - try (final Admin admin = TEST_UTIL.getHBaseAdmin()) { + try (final Admin admin = TEST_UTIL.getAdmin()) { admin.split(name); TEST_UTIL.waitFor(60000, new Waiter.Predicate() { @Override @@ -87,16 +89,79 @@ public class TestMultiRespectsLimits { for (int i = 0; i < MAX_SIZE; i++) { gets.add(new Get(HBaseTestingUtility.ROWS[i])); } - Result[] results = t.get(gets); - assertEquals(MAX_SIZE, results.length); + RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer(); BaseSource s = rpcServer.getMetrics().getMetricsSource(); + long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s); + long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s); + + Result[] results = t.get(gets); + assertEquals(MAX_SIZE, results.length); // Cells from TEST_UTIL.loadTable have a length of 27. // Multiplying by less than that gives an easy lower bound on size. // However in reality each kv is being reported as much higher than that. - METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, s); + METRICS_ASSERT.assertCounterGt("exceptions", + startingExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s); METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", - (MAX_SIZE * 25) / MAX_SIZE, s); + startingMultiExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s); + } + + @Test + public void testBlockMultiLimits() throws Exception { + final TableName name = TableName.valueOf("testBlockMultiLimits"); + Table t = TEST_UTIL.createTable(name, FAMILY); + + final HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + RpcServerInterface rpcServer = regionServer.getRpcServer(); + BaseSource s = rpcServer.getMetrics().getMetricsSource(); + long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s); + long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s); + + byte[] row = Bytes.toBytes("TEST"); + byte[][] cols = new byte[][]{ + Bytes.toBytes("0"), // Get this + Bytes.toBytes("1"), // Buffer + Bytes.toBytes("2"), // Get This + Bytes.toBytes("3"), // Buffer + }; + + // Set the value size so that one result will be less than the MAX_SIE + // however the block being reference will be larger than MAX_SIZE. + // This should cause the regionserver to try and send a result immediately. + byte[] value = new byte[MAX_SIZE - 200]; + ThreadLocalRandom.current().nextBytes(value); + + for (byte[] col:cols) { + Put p = new Put(row); + p.addImmutable(FAMILY, col, value); + t.put(p); + } + + // Make sure that a flush happens + try (final Admin admin = TEST_UTIL.getAdmin()) { + admin.flush(name); + TEST_UTIL.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return regionServer.getOnlineRegions(name).get(0).getMaxFlushedSeqId() > 3; + } + }); + } + + List gets = new ArrayList<>(2); + Get g0 = new Get(row); + g0.addColumn(FAMILY, cols[0]); + gets.add(g0); + + Get g2 = new Get(row); + g2.addColumn(FAMILY, cols[2]); + gets.add(g2); + + Result[] results = t.get(gets); + assertEquals(2, results.length); + METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s); + METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", + startingMultiExceptions, s); } }