HBASE-27558 Scan quotas and limits should account for total block IO (#4967)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Bryan Beaudreault 2023-01-30 09:27:45 -05:00 committed by GitHub
parent 382681e2d6
commit 76207257bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 553 additions and 30 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.IntConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
@ -277,6 +278,11 @@ public class HalfStoreFileReader extends StoreFileReader {
public void shipped() throws IOException {
this.delegate.shipped();
}
@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
this.delegate.recordBlockSize(blockSizeConsumer);
}
};
}

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Optional;
import java.util.function.IntConsumer;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -336,6 +337,9 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
// unreferenced block please.
protected HFileBlock curBlock;
// Whether we returned a result for curBlock's size in recordBlockSize().
// gets reset whenever curBlock is changed.
private boolean providedCurrentBlockSize = false;
// Previous blocks that were used in the course of the read
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();
@ -355,6 +359,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
prevBlocks.add(this.curBlock);
}
this.curBlock = block;
this.providedCurrentBlockSize = false;
}
void reset() {
@ -415,6 +420,14 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
this.returnBlocks(true);
}
@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
if (!providedCurrentBlockSize && curBlock != null) {
providedCurrentBlockSize = true;
blockSizeConsumer.accept(curBlock.getUncompressedSizeWithoutHeader());
}
}
// Returns the #bytes in HFile for the current cell. Used to skip these many bytes in current
// HFile block's buffer so as to position to the next cell.
private int getCurCellSerializedSize() {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.IntConsumer;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.regionserver.Shipper;
import org.apache.yetus.audience.InterfaceAudience;
@ -140,4 +141,11 @@ public interface HFileScanner extends Shipper, Closeable {
*/
@Override
void close();
/**
* Record the size of the current block in bytes, passing as an argument to the blockSizeConsumer.
* Implementations should ensure that blockSizeConsumer is only called once per block.
* @param blockSizeConsumer to be called with block size in bytes, once per block.
*/
void recordBlockSize(IntConsumer blockSizeConsumer);
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.function.IntConsumer;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
@ -104,6 +105,11 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
return !this.current.isFileScanner();
}
@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
this.current.recordBlockSize(blockSizeConsumer);
}
@Override
public Cell next() throws IOException {
if (this.current == null) {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.Closeable;
import java.io.IOException;
import java.util.function.IntConsumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
@ -125,6 +126,13 @@ public interface KeyValueScanner extends Shipper, Closeable {
/** Returns true if this is a file scanner. Otherwise a memory scanner is assumed. */
boolean isFileScanner();
/**
* Record the size of the current block in bytes, passing as an argument to the blockSizeConsumer.
* Implementations should ensure that blockSizeConsumer is only called once per block.
* @param blockSizeConsumer to be called with block size in bytes, once per block.
*/
void recordBlockSize(IntConsumer blockSizeConsumer);
/**
* @return the file path if this is a file scanner, otherwise null.
* @see #isFileScanner()

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.function.IntConsumer;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -63,6 +64,11 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner {
return false;
}
@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
// do nothing
}
@Override
public Path getFilePath() {
// Not a file by default.

View File

@ -42,7 +42,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -3282,8 +3281,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
// return whether we have more results in region.
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
throws IOException {
ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
HRegion region = rsh.r;
RegionScanner scanner = rsh.s;
long maxResultSize;
@ -3343,7 +3341,19 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
// maxResultSize - either we can reach this much size for all cells(being read) data or sum
// of heap size occupied by cells(being read). Cell data means its key and value parts.
contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
// maxQuotaResultSize - max results just from server side configuration and quotas, without
// user's specified max. We use this for evaluating limits based on blocks (not cells).
// We may have accumulated some results in coprocessor preScannerNext call. We estimate
// block and cell size of those using call to addSize. Update our maximums for scanner
// context so we can account for them in the real scan.
long maxCellSize = maxResultSize;
long maxBlockSize = maxQuotaResultSize;
if (rpcCall != null) {
maxBlockSize -= rpcCall.getResponseBlockSize();
maxCellSize -= rpcCall.getResponseCellSize();
}
contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
@ -3398,7 +3408,6 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
}
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
results.add(r);
numOfResults++;
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
@ -3427,12 +3436,18 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
if (limitReached || !moreRows) {
// With block size limit, we may exceed size limit without collecting any results.
// In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
// or cursor if results were collected, for example for cell size or heap size limits.
boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
// We only want to mark a ScanResponse as a heartbeat message in the event that
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
// another ScanRequest only to realize that they already have all the values
if (moreRows && timeLimitReached) {
// Heartbeat messages occur when the time limit has been reached.
if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
// Heartbeat messages occur when the time limit has been reached, or size limit has
// been reached before collecting any results. This can happen for heavily filtered
// scans which scan over too many blocks.
builder.setHeartbeatMessage(true);
if (rsh.needCursor) {
Cell cursorCell = scannerContext.getLastPeekedCell();
@ -3445,6 +3460,10 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
}
values.clear();
}
if (rpcCall != null) {
rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress());
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
}
builder.setMoreResultsInRegion(moreRows);
// Check to see if the client requested that we track metrics server side. If the
// client requested metrics, retrieve the metrics from the scanner context.
@ -3606,7 +3625,6 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
} else {
limitOfRows = -1;
}
MutableObject<Object> lastBlock = new MutableObject<>();
boolean scannerClosed = false;
try {
List<Result> results = new ArrayList<>(Math.min(rows, 512));
@ -3616,8 +3634,18 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
if (region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
if (!results.isEmpty()) {
// If scanner CP added results to list, we want to account for cell and block size of
// that work. We estimate this using addSize, since CP does not get ScannerContext. If
// !done, the actual scan call below will use more accurate ScannerContext block and
// cell size tracking for the rest of the request. The two result sets will be added
// together in the RpcCall accounting.
// This here is just an estimate (see addSize for more details on estimation). We don't
// pass lastBlock to the scan call below because the real scan uses ScannerContext,
// which does not use lastBlock tracking. This may result in over counting by 1 block,
// but that is unlikely since addSize is already a rough estimate.
Object lastBlock = null;
for (Result r : results) {
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
lastBlock = addSize(rpcCall, r, lastBlock);
}
}
if (bypass != null && bypass.booleanValue()) {
@ -3626,7 +3654,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
}
if (!done) {
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
results, builder, lastBlock, rpcCall);
results, builder, rpcCall);
} else {
builder.setMoreResultsInRegion(!results.isEmpty());
}

View File

@ -503,7 +503,8 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
results.clear();
// Read nothing as the rowkey was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
@ -561,8 +562,9 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!shouldStop) {
// Read nothing as the cells was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
// Read nothing as the cells was filtered, but still need to check time limit.
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
@ -608,6 +610,13 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
if (!shouldStop) {
// We check size limit because we might have read blocks in the nextRow call above, or
// in the call populateResults call. Only scans with hasFilterRow should reach this point,
// and for those scans which filter row _cells_ this is the only place we can actually
// enforce that the scan does not exceed limits since it bypasses all other checks above.
if (scannerContext.checkSizeLimit(limitScope)) {
return true;
}
continue;
}
}
@ -705,13 +714,21 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
// Enable skipping row mode, which disables limits and skips tracking progress for all
// but block size. We keep tracking block size because skipping a row in this way
// might involve reading blocks along the way.
scannerContext.setSkippingRow(true);
Cell next;
while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
this.storeHeap.next(MOCKED_LIST);
this.storeHeap.next(MOCKED_LIST, scannerContext);
}
scannerContext.setSkippingRow(false);
resetFilters();
// Calling the hook in CP which allows it to do a fast forward

View File

@ -103,6 +103,13 @@ public class ScannerContext {
boolean keepProgress;
private static boolean DEFAULT_KEEP_PROGRESS = false;
/**
* Allows temporarily ignoring limits and skipping tracking of batch and size progress. Used when
* skipping to the next row, in which case all processed cells are thrown away so should not count
* towards progress.
*/
boolean skippingRow = false;
private Cell lastPeekedCell = null;
// Set this to true will have the same behavior with reaching the time limit.
@ -123,7 +130,7 @@ public class ScannerContext {
}
// Progress fields are initialized to 0
progress = new ProgressFields(0, 0, 0);
progress = new ProgressFields(0, 0, 0, 0);
this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE;
@ -148,7 +155,9 @@ public class ScannerContext {
* @return true if the progress tracked so far in this instance will be considered during an
* invocation of {@link InternalScanner#next(java.util.List)} or
* {@link RegionScanner#next(java.util.List)}. false when the progress tracked so far
* should not be considered and should instead be wiped away via {@link #clearProgress()}
* should not be considered and should instead be wiped away via {@link #clearProgress()}.
* This only applies to per-row progress, like batch and data/heap size. Block size is
* never reset because it tracks all of the blocks scanned for an entire request.
*/
boolean getKeepProgress() {
return keepProgress;
@ -158,10 +167,32 @@ public class ScannerContext {
this.keepProgress = keepProgress;
}
/**
* In this mode, only block size progress is tracked, and limits are ignored. We set this mode
* when skipping to next row, in which case all cells returned a thrown away so should not count
* towards progress.
* @return true if we are in skipping row mode.
*/
public boolean getSkippingRow() {
return skippingRow;
}
/**
* @param skippingRow set true to cause disabling of collecting per-cell progress or enforcing any
* limits. This is used when trying to skip over all cells in a row, in which
* case those cells are thrown away so should not count towards progress.
*/
void setSkippingRow(boolean skippingRow) {
this.skippingRow = skippingRow;
}
/**
* Progress towards the batch limit has been made. Increment internal tracking of batch progress
*/
void incrementBatchProgress(int batch) {
if (skippingRow) {
return;
}
int currentBatch = progress.getBatch();
progress.setBatch(currentBatch + batch);
}
@ -170,12 +201,25 @@ public class ScannerContext {
* Progress towards the size limit has been made. Increment internal tracking of size progress
*/
void incrementSizeProgress(long dataSize, long heapSize) {
if (skippingRow) {
return;
}
long curDataSize = progress.getDataSize();
progress.setDataSize(curDataSize + dataSize);
long curHeapSize = progress.getHeapSize();
progress.setHeapSize(curHeapSize + heapSize);
}
/**
* Progress towards the block limit has been made. Increment internal track of block progress
*/
void incrementBlockProgress(int blockSize) {
if (blockSize > 0) {
long curBlockSize = progress.getBlockSize();
progress.setBlockSize(curBlockSize + blockSize);
}
}
int getBatchProgress() {
return progress.getBatch();
}
@ -188,6 +232,10 @@ public class ScannerContext {
return progress.getHeapSize();
}
long getBlockSizeProgress() {
return progress.getBlockSize();
}
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) {
setBatchProgress(batchProgress);
setSizeProgress(sizeProgress, heapSizeProgress);
@ -204,10 +252,12 @@ public class ScannerContext {
/**
* Clear away any progress that has been made so far. All progress fields are reset to initial
* values
* values. Only clears progress that should reset between rows. {@link #getBlockSizeProgress()} is
* not reset because it increments for all blocks scanned whether the result is included or
* filtered.
*/
void clearProgress() {
progress.setFields(0, 0, 0);
progress.setFields(0, 0, 0, getBlockSizeProgress());
}
/**
@ -244,7 +294,7 @@ public class ScannerContext {
/** Returns true if the size limit can be enforced in the checker's scope */
boolean hasSizeLimit(LimitScope checkerScope) {
return limits.canEnforceSizeLimitFromScope(checkerScope)
&& (limits.getDataSize() > 0 || limits.getHeapSize() > 0);
&& (limits.getDataSize() > 0 || limits.getHeapSize() > 0 || limits.getBlockSize() > 0);
}
/** Returns true if the time limit can be enforced in the checker's scope */
@ -289,7 +339,7 @@ public class ScannerContext {
* @return true when the limit is enforceable from the checker's scope and it has been reached
*/
boolean checkBatchLimit(LimitScope checkerScope) {
return hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch();
return !skippingRow && hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch();
}
/**
@ -297,8 +347,10 @@ public class ScannerContext {
* @return true when the limit is enforceable from the checker's scope and it has been reached
*/
boolean checkSizeLimit(LimitScope checkerScope) {
return hasSizeLimit(checkerScope) && (progress.getDataSize() >= limits.getDataSize()
|| progress.getHeapSize() >= limits.getHeapSize());
return !skippingRow && hasSizeLimit(checkerScope)
&& (progress.getDataSize() >= limits.getDataSize()
|| progress.getHeapSize() >= limits.getHeapSize()
|| progress.getBlockSize() >= limits.getBlockSize());
}
/**
@ -307,7 +359,7 @@ public class ScannerContext {
* @return true when the limit is enforceable from the checker's scope and it has been reached
*/
boolean checkTimeLimit(LimitScope checkerScope) {
return hasTimeLimit(checkerScope)
return !skippingRow && hasTimeLimit(checkerScope)
&& (returnImmediately || EnvironmentEdgeManager.currentTime() >= limits.getTime());
}
@ -383,10 +435,12 @@ public class ScannerContext {
return this;
}
public Builder setSizeLimit(LimitScope sizeScope, long dataSizeLimit, long heapSizeLimit) {
public Builder setSizeLimit(LimitScope sizeScope, long dataSizeLimit, long heapSizeLimit,
long blockSizeLimit) {
limits.setDataSize(dataSizeLimit);
limits.setHeapSize(heapSizeLimit);
limits.setSizeScope(sizeScope);
limits.setBlockSize(blockSizeLimit);
return this;
}
@ -532,6 +586,9 @@ public class ScannerContext {
// The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as
// such AND data cells of Cells which are in on heap area.
long heapSize = DEFAULT_SIZE;
// The total amount of block bytes that have been loaded in order to process cells for the
// request.
long blockSize = DEFAULT_SIZE;
LimitScope timeScope = DEFAULT_SCOPE;
long time = DEFAULT_TIME;
@ -545,19 +602,21 @@ public class ScannerContext {
void copy(LimitFields limitsToCopy) {
if (limitsToCopy != null) {
setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getDataSize(),
limitsToCopy.getHeapSize(), limitsToCopy.getTimeScope(), limitsToCopy.getTime());
limitsToCopy.getHeapSize(), limitsToCopy.getBlockSize(), limitsToCopy.getTimeScope(),
limitsToCopy.getTime());
}
}
/**
* Set all fields together.
*/
void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize,
void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize, long blockSize,
LimitScope timeScope, long time) {
setBatch(batch);
setSizeScope(sizeScope);
setDataSize(dataSize);
setHeapSize(heapSize);
setBlockSize(blockSize);
setTimeScope(timeScope);
setTime(time);
}
@ -583,6 +642,10 @@ public class ScannerContext {
return this.heapSize;
}
long getBlockSize() {
return this.blockSize;
}
void setDataSize(long dataSize) {
this.dataSize = dataSize;
}
@ -591,6 +654,10 @@ public class ScannerContext {
this.heapSize = heapSize;
}
void setBlockSize(long blockSize) {
this.blockSize = blockSize;
}
/** Returns {@link LimitScope} indicating scope in which the size limit is enforced */
LimitScope getSizeScope() {
return this.sizeScope;
@ -647,6 +714,9 @@ public class ScannerContext {
sb.append(", heapSize:");
sb.append(heapSize);
sb.append(", blockSize:");
sb.append(blockSize);
sb.append(", sizeScope:");
sb.append(sizeScope);
@ -675,18 +745,22 @@ public class ScannerContext {
// The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as
// such AND data cells of Cells which are in on heap area.
long heapSize = DEFAULT_SIZE;
// The total amount of block bytes that have been loaded in order to process cells for the
// request.
long blockSize = DEFAULT_SIZE;
ProgressFields(int batch, long size, long heapSize) {
setFields(batch, size, heapSize);
ProgressFields(int batch, long size, long heapSize, long blockSize) {
setFields(batch, size, heapSize, blockSize);
}
/**
* Set all fields together.
*/
void setFields(int batch, long dataSize, long heapSize) {
void setFields(int batch, long dataSize, long heapSize, long blockSize) {
setBatch(batch);
setDataSize(dataSize);
setHeapSize(heapSize);
setBlockSize(blockSize);
}
int getBatch() {
@ -705,10 +779,18 @@ public class ScannerContext {
return this.heapSize;
}
long getBlockSize() {
return this.blockSize;
}
void setDataSize(long dataSize) {
this.dataSize = dataSize;
}
void setBlockSize(long blockSize) {
this.blockSize = blockSize;
}
void setHeapSize(long heapSize) {
this.heapSize = heapSize;
}
@ -727,6 +809,9 @@ public class ScannerContext {
sb.append(", heapSize:");
sb.append(heapSize);
sb.append(", blockSize:");
sb.append(blockSize);
sb.append("}");
return sb.toString();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.function.IntConsumer;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -286,6 +287,11 @@ public class SegmentScanner implements KeyValueScanner {
return false;
}
@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
// do nothing
}
@Override
public Path getFilePath() {
return null;

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntConsumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@ -450,6 +451,11 @@ public class StoreFileScanner implements KeyValueScanner {
return true;
}
@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
hfs.recordBlockSize(blockSizeConsumer);
}
@Override
public Path getFilePath() {
return reader.getHFileReader().getPath();

View File

@ -569,7 +569,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Clear progress away unless invoker has indicated it should be kept.
if (!scannerContext.getKeepProgress()) {
if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) {
scannerContext.clearProgress();
}
@ -612,6 +612,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// here, we still need to scan all the qualifiers before returning...
scannerContext.returnImmediately();
}
heap.recordBlockSize(scannerContext::incrementBlockProgress);
prevCell = cell;
scannerContext.setLastPeekedCell(cell);
topChanged = false;
@ -750,6 +753,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
default:
throw new RuntimeException("UNEXPECTED");
}
// One last chance to break due to size limit. The INCLUDE* cases above already check
// limit and continue. For the various filtered cases, we need to check because block
// size limit may have been exceeded even if we don't add cells to result list.
if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
} while ((cell = this.heap.peek()) != null);
if (count > 0) {

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -81,6 +83,39 @@ public class TestHFileReaderImpl {
return ncTFile;
}
/**
* Test that we only count block size once per block while scanning
*/
@Test
public void testRecordBlockSize() throws IOException {
Path p = makeNewFile();
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration conf = TEST_UTIL.getConfiguration();
HFile.Reader reader = HFile.createReader(fs, p, CacheConfig.DISABLED, true, conf);
try (HFileReaderImpl.HFileScannerImpl scanner =
(HFileReaderImpl.HFileScannerImpl) reader.getScanner(conf, true, true, false)) {
scanner.seekTo();
scanner.recordBlockSize(
size -> assertTrue("expected non-zero block size on first request", size > 0));
scanner.recordBlockSize(
size -> assertEquals("expected zero block size on second request", 0, (int) size));
AtomicInteger blocks = new AtomicInteger(0);
while (scanner.next()) {
scanner.recordBlockSize(size -> {
blocks.incrementAndGet();
// there's only 2 cells in the second block
assertTrue("expected remaining block to be less than block size",
size < toKV("a").getLength() * 3);
});
}
assertEquals("expected only one remaining block but got " + blocks.get(), 1, blocks.get());
}
}
@Test
public void testSeekBefore() throws Exception {
Path p = makeNewFile();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.function.IntConsumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
@ -113,4 +114,9 @@ public class DelegatingKeyValueScanner implements KeyValueScanner {
public Cell getNextIndexedKey() {
return delegate.getNextIndexedKey();
}
@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
delegate.recordBlockSize(blockSizeConsumer);
}
}

View File

@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SkipFilter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ LargeTests.class })
public class TestScannerBlockSizeLimits {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestScannerBlockSizeLimits.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final TableName TABLE = TableName.valueOf("TestScannerBlockSizeLimits");
private static final byte[] FAMILY1 = Bytes.toBytes("0");
private static final byte[] FAMILY2 = Bytes.toBytes("1");
private static final byte[] DATA = new byte[1000];
private static final byte[][] FAMILIES = new byte[][] { FAMILY1, FAMILY2 };
private static final byte[] COLUMN1 = Bytes.toBytes(0);
private static final byte[] COLUMN2 = Bytes.toBytes(1);
private static final byte[] COLUMN3 = Bytes.toBytes(2);
private static final byte[] COLUMN4 = Bytes.toBytes(4);
private static final byte[] COLUMN5 = Bytes.toBytes(5);
private static final byte[][] COLUMNS = new byte[][] { COLUMN1, COLUMN2 };
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4200);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE, FAMILIES, 1, 2048);
createTestData();
}
private static void createTestData() throws IOException, InterruptedException {
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
HRegion region = TEST_UTIL.getRSForFirstRegionInTable(TABLE).getRegion(regionName);
for (int i = 1; i < 10; i++) {
// 5 columns per row, in 2 families
// Each column value is 1000 bytes, which is enough to fill a full block with row and header.
// So 5 blocks per row.
Put put = new Put(Bytes.toBytes(i));
for (int j = 0; j < 6; j++) {
put.addColumn(FAMILY1, Bytes.toBytes(j), DATA);
}
put.addColumn(FAMILY2, COLUMN1, DATA);
region.put(put);
if (i % 2 == 0) {
region.flush(true);
}
}
// we've created 10 storefiles at this point, 5 per family
region.flush(true);
}
/**
* Simplest test that ensures we don't count block sizes too much. These 2 requested cells are in
* the same block, so should be returned in 1 request. If we mis-counted blocks, it'd come in 2
* requests.
*/
@Test
public void testSingleBlock() throws IOException {
Table table = TEST_UTIL.getConnection().getTable(TABLE);
ResultScanner scanner =
table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1)).withStopRow(Bytes.toBytes(2))
.addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM));
ScanMetrics metrics = scanner.getScanMetrics();
scanner.next(100);
assertEquals(1, metrics.countOfRowsScanned.get());
assertEquals(1, metrics.countOfRPCcalls.get());
}
/**
* Tests that we check size limit after filterRowKey. When filterRowKey, we call nextRow to skip
* to next row. This should be efficient in this case, but we still need to check size limits
* after each row is processed. So in this test, we accumulate some block IO reading row 1, then
* skip row 2 and should return early at that point. The next rpc call starts with row3 blocks
* loaded, so can return the whole row in one rpc. If we were not checking size limits, we'd have
* been able to load an extra row 3 cell into the first rpc and thus split row 3 across multiple
* Results.
*/
@Test
public void testCheckLimitAfterFilterRowKey() throws IOException {
Table table = TEST_UTIL.getConnection().getTable(TABLE);
ResultScanner scanner = table.getScanner(getBaseScan().addColumn(FAMILY1, COLUMN1)
.addColumn(FAMILY1, COLUMN2).addColumn(FAMILY1, COLUMN3).addFamily(FAMILY2)
.setFilter(new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(2)))));
boolean foundRow3 = false;
for (Result result : scanner) {
Set<Integer> rows = new HashSet<>();
for (Cell cell : result.rawCells()) {
rows.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
if (rows.contains(3)) {
assertFalse("expected row3 to come all in one result, but found it in two results",
foundRow3);
assertEquals(1, rows.size());
foundRow3 = true;
}
}
ScanMetrics metrics = scanner.getScanMetrics();
// 4 blocks per row, so 36 blocks. We can scan 3 blocks per RPC, which is 12 RPCs. But we can
// skip 1 row, so skip 2 RPCs.
assertEquals(10, metrics.countOfRPCcalls.get());
}
/**
* After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to
* filter.filterRow(), nextRow() is called which might accumulate more block IO. Validates that in
* this case we still honor block limits.
*/
@Test
public void testCheckLimitAfterFilteringRowCellsDueToFilterRow() throws IOException {
Table table = TEST_UTIL.getConnection().getTable(TABLE);
ResultScanner scanner = table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true)
.addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM)
.setFilter(new SkipFilter(new QualifierFilter(CompareOperator.EQUAL,
new BinaryComparator(Bytes.toBytes("dasfasf"))))));
// Our filter doesn't end up matching any real columns, so expect only cursors
for (Result result : scanner) {
assertTrue(result.isCursor());
}
ScanMetrics metrics = scanner.getScanMetrics();
// scanning over 9 rows, filtering on 2 contiguous columns each, so 9 blocks total
// limited to 4200 bytes per which is enough for 3 blocks (exceed limit after loading 3rd)
// so that's 3 RPC and the last RPC pulls the cells loaded by the last block
assertEquals(4, metrics.countOfRPCcalls.get());
}
/**
* At the end of the loop in StoreScanner, we do one more check of size limits. This is to catch
* block size being exceeded while filtering cells within a store. Test to ensure that we do that,
* otherwise we'd see no cursors below.
*/
@Test
public void testCheckLimitAfterFilteringCell() throws IOException {
Table table = TEST_UTIL.getConnection().getTable(TABLE);
ResultScanner scanner = table.getScanner(getBaseScan()
.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(COLUMN2))));
int cursors = 0;
for (Result result : scanner) {
if (result.isCursor()) {
cursors++;
}
}
ScanMetrics metrics = scanner.getScanMetrics();
// We will return 9 rows, but also 2 cursors because we exceed the scan size limit partway
// through. So that accounts for 11 rpcs.
assertEquals(2, cursors);
assertEquals(11, metrics.countOfRPCcalls.get());
}
/**
* After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to
* filter.filterRowCells(), we fall through to a final results.isEmpty() check near the end of the
* method. If results are empty at this point (which they are), nextRow() is called which might
* accumulate more block IO. Validates that in this case we still honor block limits.
*/
@Test
public void testCheckLimitAfterFilteringRowCells() throws IOException {
Table table = TEST_UTIL.getConnection().getTable(TABLE);
ResultScanner scanner = table
.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true).addColumn(FAMILY1, COLUMN1)
.setReadType(Scan.ReadType.STREAM).setFilter(new SingleColumnValueExcludeFilter(FAMILY1,
COLUMN1, CompareOperator.EQUAL, new BinaryComparator(DATA))));
// Since we use SingleColumnValueExcludeFilter and dont include any other columns, the column
// we load to test ends up being excluded from the result. So we only expect cursors here.
for (Result result : scanner) {
assertTrue(result.isCursor());
}
ScanMetrics metrics = scanner.getScanMetrics();
// Our filter causes us to read the first column of each row, then INCLUDE_AND_SEEK_NEXT_ROW.
// So we load 1 block per row, and there are 9 rows. Our max scan size is large enough to
// return 2 full blocks, with some overflow. So we are able to squeeze this all into 4 RPCs.
assertEquals(4, metrics.countOfRPCcalls.get());
}
/**
* Tests that when we seek over blocks we dont include them in the block size of the request
*/
@Test
public void testSeekNextUsingHint() throws IOException {
Table table = TEST_UTIL.getConnection().getTable(TABLE);
ResultScanner scanner = table.getScanner(
getBaseScan().addFamily(FAMILY1).setFilter(new ColumnPaginationFilter(1, COLUMN5)));
scanner.next(100);
ScanMetrics metrics = scanner.getScanMetrics();
// We have to read the first cell/block of each row, then can skip to the last block. So that's
// 2 blocks per row to read (18 total). Our max scan size is enough to read 3 blocks per RPC,
// plus one final RPC to finish region.
assertEquals(7, metrics.countOfRPCcalls.get());
}
/**
* We enable cursors and partial results to give us more granularity over counting of results, and
* we enable STREAM so that no auto switching from pread to stream occurs -- this throws off the
* rpc counts.
*/
private Scan getBaseScan() {
return new Scan().setScanMetricsEnabled(true).setNeedCursorResult(true)
.setAllowPartialResults(true).setReadType(Scan.ReadType.STREAM);
}
}