HBASE-27570 Unify tracking of block IO across all read request types (#5004)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
f619001dff
commit
8809c880dd
|
@ -26,7 +26,6 @@ import java.lang.reflect.Method;
|
|||
import java.net.BindException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -48,7 +47,6 @@ import java.util.concurrent.atomic.LongAdder;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
|
||||
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||
import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
@ -767,7 +765,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
List<ClientProtos.Action> mutations = null;
|
||||
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
|
||||
IOException sizeIOE = null;
|
||||
Object lastBlock = null;
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
|
||||
ResultOrException.newBuilder();
|
||||
boolean hasResultOrException = false;
|
||||
|
@ -892,7 +889,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
} else {
|
||||
pbResult = ProtobufUtil.toResult(r);
|
||||
}
|
||||
lastBlock = addSize(context, r, lastBlock);
|
||||
addSize(context, r);
|
||||
hasResultOrException = true;
|
||||
resultOrExceptionBuilder.setResult(pbResult);
|
||||
}
|
||||
|
@ -1378,44 +1375,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
}
|
||||
|
||||
/**
|
||||
* Method to account for the size of retained cells and retained data blocks.
|
||||
* @param context rpc call context
|
||||
* @param r result to add size.
|
||||
* @param lastBlock last block to check whether we need to add the block size in context.
|
||||
* Method to account for the size of retained cells.
|
||||
* @param context rpc call context
|
||||
* @param r result to add size.
|
||||
* @return an object that represents the last referenced block from this response.
|
||||
*/
|
||||
Object addSize(RpcCallContext context, Result r, Object lastBlock) {
|
||||
void addSize(RpcCallContext context, Result r) {
|
||||
if (context != null && r != null && !r.isEmpty()) {
|
||||
for (Cell c : r.rawCells()) {
|
||||
context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
|
||||
|
||||
// Since byte buffers can point all kinds of crazy places it's harder to keep track
|
||||
// of which blocks are kept alive by what byte buffer.
|
||||
// So we make a guess.
|
||||
if (c instanceof ByteBufferExtendedCell) {
|
||||
ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
|
||||
ByteBuffer bb = bbCell.getValueByteBuffer();
|
||||
if (bb != lastBlock) {
|
||||
context.incrementResponseBlockSize(bb.capacity());
|
||||
lastBlock = bb;
|
||||
}
|
||||
} else {
|
||||
// We're using the last block being the same as the current block as
|
||||
// a proxy for pointing to a new block. This won't be exact.
|
||||
// If there are multiple gets that bounce back and forth
|
||||
// Then it's possible that this will over count the size of
|
||||
// referenced blocks. However it's better to over count and
|
||||
// use two rpcs than to OOME the regionserver.
|
||||
byte[] valueArray = c.getValueArray();
|
||||
if (valueArray != lastBlock) {
|
||||
context.incrementResponseBlockSize(valueArray.length);
|
||||
lastBlock = valueArray;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return lastBlock;
|
||||
}
|
||||
|
||||
/** Returns Remote client's ip and port else null if can't be determined. */
|
||||
|
@ -2602,7 +2572,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
pbr = ProtobufUtil.toResultNoData(r);
|
||||
((HBaseRpcController) controller)
|
||||
.setCellScanner(CellUtil.createCellScanner(r.rawCells()));
|
||||
addSize(context, r, null);
|
||||
addSize(context, r);
|
||||
} else {
|
||||
pbr = ProtobufUtil.toResult(r);
|
||||
}
|
||||
|
@ -2989,7 +2959,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
|
||||
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
|
||||
if (clientCellBlockSupported) {
|
||||
addSize(context, result.getResult(), null);
|
||||
addSize(context, result.getResult());
|
||||
}
|
||||
} else {
|
||||
Result r = null;
|
||||
|
@ -3021,7 +2991,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
|
||||
addResult(builder, r, controller, clientCellBlockSupported);
|
||||
if (clientCellBlockSupported) {
|
||||
addSize(context, r, null);
|
||||
addSize(context, r);
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
|
@ -3373,9 +3343,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
// of heap size occupied by cells(being read). Cell data means its key and value parts.
|
||||
// maxQuotaResultSize - max results just from server side configuration and quotas, without
|
||||
// user's specified max. We use this for evaluating limits based on blocks (not cells).
|
||||
// We may have accumulated some results in coprocessor preScannerNext call. We estimate
|
||||
// block and cell size of those using call to addSize. Update our maximums for scanner
|
||||
// context so we can account for them in the real scan.
|
||||
// We may have accumulated some results in coprocessor preScannerNext call. Subtract any
|
||||
// cell or block size from maximum here so we adhere to total limits of request.
|
||||
// Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
|
||||
// have accumulated block bytes. If not, this will be 0 for block size.
|
||||
long maxCellSize = maxResultSize;
|
||||
long maxBlockSize = maxQuotaResultSize;
|
||||
if (rpcCall != null) {
|
||||
|
@ -3491,7 +3462,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
values.clear();
|
||||
}
|
||||
if (rpcCall != null) {
|
||||
rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress());
|
||||
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
|
||||
}
|
||||
builder.setMoreResultsInRegion(moreRows);
|
||||
|
@ -3664,18 +3634,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
if (region.getCoprocessorHost() != null) {
|
||||
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
|
||||
if (!results.isEmpty()) {
|
||||
// If scanner CP added results to list, we want to account for cell and block size of
|
||||
// that work. We estimate this using addSize, since CP does not get ScannerContext. If
|
||||
// !done, the actual scan call below will use more accurate ScannerContext block and
|
||||
// cell size tracking for the rest of the request. The two result sets will be added
|
||||
// together in the RpcCall accounting.
|
||||
// This here is just an estimate (see addSize for more details on estimation). We don't
|
||||
// pass lastBlock to the scan call below because the real scan uses ScannerContext,
|
||||
// which does not use lastBlock tracking. This may result in over counting by 1 block,
|
||||
// but that is unlikely since addSize is already a rough estimate.
|
||||
Object lastBlock = null;
|
||||
for (Result r : results) {
|
||||
lastBlock = addSize(rpcCall, r, lastBlock);
|
||||
// add cell size from CP results so we can track response size and update limits
|
||||
// when calling scan below if !done. We'll also have tracked block size if the CP
|
||||
// got results from hbase, since StoreScanner tracks that for all calls automatically.
|
||||
addSize(rpcCall, r);
|
||||
}
|
||||
}
|
||||
if (bypass != null && bypass.booleanValue()) {
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.InterruptedIOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.client.IsolationLevel;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
|
||||
|
@ -573,6 +576,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
scannerContext.clearProgress();
|
||||
}
|
||||
|
||||
Optional<RpcCall> rpcCall =
|
||||
matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();
|
||||
|
||||
int count = 0;
|
||||
long totalBytesRead = 0;
|
||||
boolean onlyFromMemstore = matcher.isUserScan();
|
||||
|
@ -612,7 +618,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
scannerContext.returnImmediately();
|
||||
}
|
||||
|
||||
heap.recordBlockSize(scannerContext::incrementBlockProgress);
|
||||
heap.recordBlockSize(blockSize -> {
|
||||
if (rpcCall.isPresent()) {
|
||||
rpcCall.get().incrementResponseBlockSize(blockSize);
|
||||
}
|
||||
scannerContext.incrementBlockProgress(blockSize);
|
||||
});
|
||||
|
||||
prevCell = cell;
|
||||
scannerContext.setLastPeekedCell(cell);
|
||||
|
|
|
@ -19,11 +19,13 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||
import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.CompatibilityFactory;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -32,10 +34,13 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.QualifierFilter;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.logging.Log4jUtils;
|
||||
import org.apache.hadoop.hbase.metrics.BaseSource;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
|
@ -64,7 +69,7 @@ public class TestMultiRespectsLimits {
|
|||
private static final MetricsAssertHelper METRICS_ASSERT =
|
||||
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
|
||||
private final static byte[] FAMILY = Bytes.toBytes("D");
|
||||
public static final int MAX_SIZE = 100;
|
||||
public static final int MAX_SIZE = 90;
|
||||
private static String LOG_LEVEL;
|
||||
|
||||
@Rule
|
||||
|
@ -152,6 +157,10 @@ public class TestMultiRespectsLimits {
|
|||
Bytes.toBytes("3"), // Get This
|
||||
Bytes.toBytes("4"), // Buffer
|
||||
Bytes.toBytes("5"), // Buffer
|
||||
Bytes.toBytes("6"), // Buffer
|
||||
Bytes.toBytes("7"), // Get This
|
||||
Bytes.toBytes("8"), // Buffer
|
||||
Bytes.toBytes("9"), // Buffer
|
||||
};
|
||||
|
||||
// Set the value size so that one result will be less than the MAX_SIZE
|
||||
|
@ -160,7 +169,12 @@ public class TestMultiRespectsLimits {
|
|||
byte[] value = new byte[1];
|
||||
Bytes.random(value);
|
||||
|
||||
for (byte[] col : cols) {
|
||||
for (int i = 0; i < cols.length; i++) {
|
||||
if (i == 6) {
|
||||
// do a flush here so we end up with 2 blocks, 55 and 45 bytes
|
||||
flush(regionServer, tableName);
|
||||
}
|
||||
byte[] col = cols[i];
|
||||
Put p = new Put(row);
|
||||
p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(FAMILY)
|
||||
.setQualifier(col).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put).setValue(value)
|
||||
|
@ -169,28 +183,43 @@ public class TestMultiRespectsLimits {
|
|||
}
|
||||
|
||||
// Make sure that a flush happens
|
||||
try (final Admin admin = TEST_UTIL.getAdmin()) {
|
||||
admin.flush(tableName);
|
||||
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return regionServer.getRegions(tableName).get(0).getMaxFlushedSeqId() > 3;
|
||||
}
|
||||
});
|
||||
}
|
||||
flush(regionServer, tableName);
|
||||
|
||||
List<Get> gets = new ArrayList<>(2);
|
||||
Get g0 = new Get(row);
|
||||
g0.addColumn(FAMILY, cols[0]);
|
||||
List<Get> gets = new ArrayList<>(4);
|
||||
// This get returns nothing since the filter doesn't match. Filtered cells still retain
|
||||
// blocks, and this is a full row scan of both blocks. This equals 100 bytes so we should
|
||||
// throw a multiResponseTooLarge after this get if we are counting filtered cells correctly.
|
||||
Get g0 = new Get(row).addFamily(FAMILY).setFilter(
|
||||
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("sdf"))));
|
||||
gets.add(g0);
|
||||
|
||||
// g1 and g2 each count the first 55 byte block, so we end up with block size of 110
|
||||
// after g2 and throw a multiResponseTooLarge before g3
|
||||
Get g1 = new Get(row);
|
||||
g1.addColumn(FAMILY, cols[0]);
|
||||
gets.add(g1);
|
||||
|
||||
Get g2 = new Get(row);
|
||||
g2.addColumn(FAMILY, cols[3]);
|
||||
gets.add(g2);
|
||||
|
||||
Get g3 = new Get(row);
|
||||
g3.addColumn(FAMILY, cols[7]);
|
||||
gets.add(g3);
|
||||
|
||||
Result[] results = t.get(gets);
|
||||
assertEquals(2, results.length);
|
||||
METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s);
|
||||
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", startingMultiExceptions, s);
|
||||
assertEquals(4, results.length);
|
||||
// Expect 2 exceptions (thus 3 rpcs) -- one for g0, then another for g1 + g2, final rpc for g3.
|
||||
// If we tracked lastBlock we could squeeze g3 into the second rpc because g2 would be "free"
|
||||
// since it's in the same block as g1.
|
||||
METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions + 1, s);
|
||||
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", startingMultiExceptions + 1,
|
||||
s);
|
||||
}
|
||||
|
||||
private void flush(HRegionServer regionServer, TableName tableName) throws IOException {
|
||||
for (HRegion region : regionServer.getRegions(tableName)) {
|
||||
region.flush(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue