diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index cc680173a4e..2119a3e7cbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Optional; +import java.util.function.IntConsumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; @@ -277,6 +278,11 @@ public class HalfStoreFileReader extends StoreFileReader { public void shipped() throws IOException { this.delegate.shipped(); } + + @Override + public void recordBlockSize(IntConsumer blockSizeConsumer) { + this.delegate.recordBlockSize(blockSizeConsumer); + } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 5918c76ffcd..96d808ad2cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Optional; +import java.util.function.IntConsumer; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -336,6 +337,9 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { // RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the // unreferenced block please. protected HFileBlock curBlock; + // Whether we returned a result for curBlock's size in recordBlockSize(). + // gets reset whenever curBlock is changed. + private boolean providedCurrentBlockSize = false; // Previous blocks that were used in the course of the read protected final ArrayList prevBlocks = new ArrayList<>(); @@ -355,6 +359,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { prevBlocks.add(this.curBlock); } this.curBlock = block; + this.providedCurrentBlockSize = false; } void reset() { @@ -415,6 +420,14 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { this.returnBlocks(true); } + @Override + public void recordBlockSize(IntConsumer blockSizeConsumer) { + if (!providedCurrentBlockSize && curBlock != null) { + providedCurrentBlockSize = true; + blockSizeConsumer.accept(curBlock.getUncompressedSizeWithoutHeader()); + } + } + // Returns the #bytes in HFile for the current cell. Used to skip these many bytes in current // HFile block's buffer so as to position to the next cell. private int getCurCellSerializedSize() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index fd5c66b126b..0393d3b788a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.function.IntConsumer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.regionserver.Shipper; import org.apache.yetus.audience.InterfaceAudience; @@ -140,4 +141,11 @@ public interface HFileScanner extends Shipper, Closeable { */ @Override void close(); + + /** + * Record the size of the current block in bytes, passing as an argument to the blockSizeConsumer. + * Implementations should ensure that blockSizeConsumer is only called once per block. + * @param blockSizeConsumer to be called with block size in bytes, once per block. + */ + void recordBlockSize(IntConsumer blockSizeConsumer); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 1fe80bc58b0..9f42e7ce2ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import java.util.function.IntConsumer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; @@ -104,6 +105,11 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return !this.current.isFileScanner(); } + @Override + public void recordBlockSize(IntConsumer blockSizeConsumer) { + this.current.recordBlockSize(blockSizeConsumer); + } + @Override public Cell next() throws IOException { if (this.current == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index d90cf78dda5..c3b60792fb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.Closeable; import java.io.IOException; +import java.util.function.IntConsumer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; @@ -125,6 +126,13 @@ public interface KeyValueScanner extends Shipper, Closeable { /** Returns true if this is a file scanner. Otherwise a memory scanner is assumed. */ boolean isFileScanner(); + /** + * Record the size of the current block in bytes, passing as an argument to the blockSizeConsumer. + * Implementations should ensure that blockSizeConsumer is only called once per block. + * @param blockSizeConsumer to be called with block size in bytes, once per block. + */ + void recordBlockSize(IntConsumer blockSizeConsumer); + /** * @return the file path if this is a file scanner, otherwise null. * @see #isFileScanner() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index 02d4d85d7e1..8f1898a3c65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.function.IntConsumer; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -63,6 +64,11 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner { return false; } + @Override + public void recordBlockSize(IntConsumer blockSizeConsumer) { + // do nothing + } + @Override public Path getFilePath() { // Not a file by default. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index f83c2c6b788..0777d25c158 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; -import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -3312,8 +3311,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin // return whether we have more results in region. private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, long maxQuotaResultSize, int maxResults, int limitOfRows, List results, - ScanResponse.Builder builder, MutableObject lastBlock, RpcCall rpcCall) - throws IOException { + ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { HRegion region = rsh.r; RegionScanner scanner = rsh.s; long maxResultSize; @@ -3373,7 +3371,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); // maxResultSize - either we can reach this much size for all cells(being read) data or sum // of heap size occupied by cells(being read). Cell data means its key and value parts. - contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); + // maxQuotaResultSize - max results just from server side configuration and quotas, without + // user's specified max. We use this for evaluating limits based on blocks (not cells). + // We may have accumulated some results in coprocessor preScannerNext call. We estimate + // block and cell size of those using call to addSize. Update our maximums for scanner + // context so we can account for them in the real scan. + long maxCellSize = maxResultSize; + long maxBlockSize = maxQuotaResultSize; + if (rpcCall != null) { + maxBlockSize -= rpcCall.getResponseBlockSize(); + maxCellSize -= rpcCall.getResponseCellSize(); + } + + contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); contextBuilder.setBatchLimit(scanner.getBatch()); contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTrackMetrics(trackMetrics); @@ -3428,7 +3438,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin } boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); - lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); results.add(r); numOfResults++; if (!mayHaveMoreCellsInRow && limitOfRows > 0) { @@ -3457,12 +3466,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; if (limitReached || !moreRows) { + // With block size limit, we may exceed size limit without collecting any results. + // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat + // or cursor if results were collected, for example for cell size or heap size limits. + boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); // We only want to mark a ScanResponse as a heartbeat message in the event that // there are more values to be read server side. If there aren't more values, // marking it as a heartbeat is wasteful because the client will need to issue // another ScanRequest only to realize that they already have all the values - if (moreRows && timeLimitReached) { - // Heartbeat messages occur when the time limit has been reached. + if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { + // Heartbeat messages occur when the time limit has been reached, or size limit has + // been reached before collecting any results. This can happen for heavily filtered + // scans which scan over too many blocks. builder.setHeartbeatMessage(true); if (rsh.needCursor) { Cell cursorCell = scannerContext.getLastPeekedCell(); @@ -3475,6 +3490,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin } values.clear(); } + if (rpcCall != null) { + rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress()); + rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); + } builder.setMoreResultsInRegion(moreRows); // Check to see if the client requested that we track metrics server side. If the // client requested metrics, retrieve the metrics from the scanner context. @@ -3636,7 +3655,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin } else { limitOfRows = -1; } - MutableObject lastBlock = new MutableObject<>(); boolean scannerClosed = false; try { List results = new ArrayList<>(Math.min(rows, 512)); @@ -3646,8 +3664,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin if (region.getCoprocessorHost() != null) { Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); if (!results.isEmpty()) { + // If scanner CP added results to list, we want to account for cell and block size of + // that work. We estimate this using addSize, since CP does not get ScannerContext. If + // !done, the actual scan call below will use more accurate ScannerContext block and + // cell size tracking for the rest of the request. The two result sets will be added + // together in the RpcCall accounting. + // This here is just an estimate (see addSize for more details on estimation). We don't + // pass lastBlock to the scan call below because the real scan uses ScannerContext, + // which does not use lastBlock tracking. This may result in over counting by 1 block, + // but that is unlikely since addSize is already a rough estimate. + Object lastBlock = null; for (Result r : results) { - lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); + lastBlock = addSize(rpcCall, r, lastBlock); } } if (bypass != null && bypass.booleanValue()) { @@ -3656,7 +3684,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin } if (!done) { scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, - results, builder, lastBlock, rpcCall); + results, builder, rpcCall); } else { builder.setMoreResultsInRegion(!results.isEmpty()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index d010d71f6cf..bab470d7534 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -500,7 +500,8 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { results.clear(); // Read nothing as the rowkey was filtered, but still need to check time limit - if (scannerContext.checkTimeLimit(limitScope)) { + // We also check size limit because we might have read blocks in getting to this point. + if (scannerContext.checkAnyLimitReached(limitScope)) { return true; } continue; @@ -558,8 +559,9 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. if (!shouldStop) { - // Read nothing as the cells was filtered, but still need to check time limit - if (scannerContext.checkTimeLimit(limitScope)) { + // Read nothing as the cells was filtered, but still need to check time limit. + // We also check size limit because we might have read blocks in getting to this point. + if (scannerContext.checkAnyLimitReached(limitScope)) { return true; } continue; @@ -605,6 +607,13 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } if (!shouldStop) { + // We check size limit because we might have read blocks in the nextRow call above, or + // in the call populateResults call. Only scans with hasFilterRow should reach this point, + // and for those scans which filter row _cells_ this is the only place we can actually + // enforce that the scan does not exceed limits since it bypasses all other checks above. + if (scannerContext.checkSizeLimit(limitScope)) { + return true; + } continue; } } @@ -702,13 +711,21 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; + + // Enable skipping row mode, which disables limits and skips tracking progress for all + // but block size. We keep tracking block size because skipping a row in this way + // might involve reading blocks along the way. + scannerContext.setSkippingRow(true); + Cell next; while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { // Check for thread interrupt status in case we have been signaled from // #interruptRegionOperation. region.checkInterrupt(); - this.storeHeap.next(MOCKED_LIST); + this.storeHeap.next(MOCKED_LIST, scannerContext); } + + scannerContext.setSkippingRow(false); resetFilters(); // Calling the hook in CP which allows it to do a fast forward diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index ddebe83331d..09945ca2303 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -103,6 +103,13 @@ public class ScannerContext { boolean keepProgress; private static boolean DEFAULT_KEEP_PROGRESS = false; + /** + * Allows temporarily ignoring limits and skipping tracking of batch and size progress. Used when + * skipping to the next row, in which case all processed cells are thrown away so should not count + * towards progress. + */ + boolean skippingRow = false; + private Cell lastPeekedCell = null; // Set this to true will have the same behavior with reaching the time limit. @@ -123,7 +130,7 @@ public class ScannerContext { } // Progress fields are initialized to 0 - progress = new ProgressFields(0, 0, 0); + progress = new ProgressFields(0, 0, 0, 0); this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; @@ -148,7 +155,9 @@ public class ScannerContext { * @return true if the progress tracked so far in this instance will be considered during an * invocation of {@link InternalScanner#next(java.util.List)} or * {@link RegionScanner#next(java.util.List)}. false when the progress tracked so far - * should not be considered and should instead be wiped away via {@link #clearProgress()} + * should not be considered and should instead be wiped away via {@link #clearProgress()}. + * This only applies to per-row progress, like batch and data/heap size. Block size is + * never reset because it tracks all of the blocks scanned for an entire request. */ boolean getKeepProgress() { return keepProgress; @@ -158,10 +167,32 @@ public class ScannerContext { this.keepProgress = keepProgress; } + /** + * In this mode, only block size progress is tracked, and limits are ignored. We set this mode + * when skipping to next row, in which case all cells returned a thrown away so should not count + * towards progress. + * @return true if we are in skipping row mode. + */ + public boolean getSkippingRow() { + return skippingRow; + } + + /** + * @param skippingRow set true to cause disabling of collecting per-cell progress or enforcing any + * limits. This is used when trying to skip over all cells in a row, in which + * case those cells are thrown away so should not count towards progress. + */ + void setSkippingRow(boolean skippingRow) { + this.skippingRow = skippingRow; + } + /** * Progress towards the batch limit has been made. Increment internal tracking of batch progress */ void incrementBatchProgress(int batch) { + if (skippingRow) { + return; + } int currentBatch = progress.getBatch(); progress.setBatch(currentBatch + batch); } @@ -170,6 +201,9 @@ public class ScannerContext { * Progress towards the size limit has been made. Increment internal tracking of size progress */ void incrementSizeProgress(long dataSize, long heapSize) { + if (skippingRow) { + return; + } long curDataSize = progress.getDataSize(); progress.setDataSize(curDataSize + dataSize); long curHeapSize = progress.getHeapSize(); @@ -182,6 +216,17 @@ public class ScannerContext { */ @Deprecated void updateTimeProgress() { + + } + + /** + * Progress towards the block limit has been made. Increment internal track of block progress + */ + void incrementBlockProgress(int blockSize) { + if (blockSize > 0) { + long curBlockSize = progress.getBlockSize(); + progress.setBlockSize(curBlockSize + blockSize); + } } int getBatchProgress() { @@ -212,6 +257,10 @@ public class ScannerContext { setProgress(batchProgress, sizeProgress, heapSizeProgress); } + long getBlockSizeProgress() { + return progress.getBlockSize(); + } + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) { setBatchProgress(batchProgress); setSizeProgress(sizeProgress, heapSizeProgress); @@ -235,10 +284,12 @@ public class ScannerContext { /** * Clear away any progress that has been made so far. All progress fields are reset to initial - * values + * values. Only clears progress that should reset between rows. {@link #getBlockSizeProgress()} is + * not reset because it increments for all blocks scanned whether the result is included or + * filtered. */ void clearProgress() { - progress.setFields(0, 0, 0); + progress.setFields(0, 0, 0, getBlockSizeProgress()); } /** @@ -275,7 +326,7 @@ public class ScannerContext { /** Returns true if the size limit can be enforced in the checker's scope */ boolean hasSizeLimit(LimitScope checkerScope) { return limits.canEnforceSizeLimitFromScope(checkerScope) - && (limits.getDataSize() > 0 || limits.getHeapSize() > 0); + && (limits.getDataSize() > 0 || limits.getHeapSize() > 0 || limits.getBlockSize() > 0); } /** Returns true if the time limit can be enforced in the checker's scope */ @@ -320,7 +371,7 @@ public class ScannerContext { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkBatchLimit(LimitScope checkerScope) { - return hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch(); + return !skippingRow && hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch(); } /** @@ -328,8 +379,10 @@ public class ScannerContext { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkSizeLimit(LimitScope checkerScope) { - return hasSizeLimit(checkerScope) && (progress.getDataSize() >= limits.getDataSize() - || progress.getHeapSize() >= limits.getHeapSize()); + return !skippingRow && hasSizeLimit(checkerScope) + && (progress.getDataSize() >= limits.getDataSize() + || progress.getHeapSize() >= limits.getHeapSize() + || progress.getBlockSize() >= limits.getBlockSize()); } /** @@ -338,7 +391,7 @@ public class ScannerContext { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkTimeLimit(LimitScope checkerScope) { - return hasTimeLimit(checkerScope) + return !skippingRow && hasTimeLimit(checkerScope) && (returnImmediately || EnvironmentEdgeManager.currentTime() >= limits.getTime()); } @@ -414,10 +467,12 @@ public class ScannerContext { return this; } - public Builder setSizeLimit(LimitScope sizeScope, long dataSizeLimit, long heapSizeLimit) { + public Builder setSizeLimit(LimitScope sizeScope, long dataSizeLimit, long heapSizeLimit, + long blockSizeLimit) { limits.setDataSize(dataSizeLimit); limits.setHeapSize(heapSizeLimit); limits.setSizeScope(sizeScope); + limits.setBlockSize(blockSizeLimit); return this; } @@ -563,6 +618,9 @@ public class ScannerContext { // The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as // such AND data cells of Cells which are in on heap area. long heapSize = DEFAULT_SIZE; + // The total amount of block bytes that have been loaded in order to process cells for the + // request. + long blockSize = DEFAULT_SIZE; LimitScope timeScope = DEFAULT_SCOPE; long time = DEFAULT_TIME; @@ -576,19 +634,21 @@ public class ScannerContext { void copy(LimitFields limitsToCopy) { if (limitsToCopy != null) { setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getDataSize(), - limitsToCopy.getHeapSize(), limitsToCopy.getTimeScope(), limitsToCopy.getTime()); + limitsToCopy.getHeapSize(), limitsToCopy.getBlockSize(), limitsToCopy.getTimeScope(), + limitsToCopy.getTime()); } } /** * Set all fields together. */ - void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize, + void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize, long blockSize, LimitScope timeScope, long time) { setBatch(batch); setSizeScope(sizeScope); setDataSize(dataSize); setHeapSize(heapSize); + setBlockSize(blockSize); setTimeScope(timeScope); setTime(time); } @@ -614,6 +674,10 @@ public class ScannerContext { return this.heapSize; } + long getBlockSize() { + return this.blockSize; + } + void setDataSize(long dataSize) { this.dataSize = dataSize; } @@ -622,6 +686,10 @@ public class ScannerContext { this.heapSize = heapSize; } + void setBlockSize(long blockSize) { + this.blockSize = blockSize; + } + /** Returns {@link LimitScope} indicating scope in which the size limit is enforced */ LimitScope getSizeScope() { return this.sizeScope; @@ -678,6 +746,9 @@ public class ScannerContext { sb.append(", heapSize:"); sb.append(heapSize); + sb.append(", blockSize:"); + sb.append(blockSize); + sb.append(", sizeScope:"); sb.append(sizeScope); @@ -706,18 +777,22 @@ public class ScannerContext { // The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as // such AND data cells of Cells which are in on heap area. long heapSize = DEFAULT_SIZE; + // The total amount of block bytes that have been loaded in order to process cells for the + // request. + long blockSize = DEFAULT_SIZE; - ProgressFields(int batch, long size, long heapSize) { - setFields(batch, size, heapSize); + ProgressFields(int batch, long size, long heapSize, long blockSize) { + setFields(batch, size, heapSize, blockSize); } /** * Set all fields together. */ - void setFields(int batch, long dataSize, long heapSize) { + void setFields(int batch, long dataSize, long heapSize, long blockSize) { setBatch(batch); setDataSize(dataSize); setHeapSize(heapSize); + setBlockSize(blockSize); } int getBatch() { @@ -736,10 +811,18 @@ public class ScannerContext { return this.heapSize; } + long getBlockSize() { + return this.blockSize; + } + void setDataSize(long dataSize) { this.dataSize = dataSize; } + void setBlockSize(long blockSize) { + this.blockSize = blockSize; + } + void setHeapSize(long heapSize) { this.heapSize = heapSize; } @@ -758,6 +841,9 @@ public class ScannerContext { sb.append(", heapSize:"); sb.append(heapSize); + sb.append(", blockSize:"); + sb.append(blockSize); + sb.append("}"); return sb.toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index c5dbca6b6e2..1d28c55570e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.Iterator; import java.util.SortedSet; +import java.util.function.IntConsumer; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -286,6 +287,11 @@ public class SegmentScanner implements KeyValueScanner { return false; } + @Override + public void recordBlockSize(IntConsumer blockSizeConsumer) { + // do nothing + } + @Override public Path getFilePath() { return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 74147f8ec05..5e666659c02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Optional; import java.util.PriorityQueue; import java.util.concurrent.atomic.LongAdder; +import java.util.function.IntConsumer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -450,6 +451,11 @@ public class StoreFileScanner implements KeyValueScanner { return true; } + @Override + public void recordBlockSize(IntConsumer blockSizeConsumer) { + hfs.recordBlockSize(blockSizeConsumer); + } + @Override public Path getFilePath() { return reader.getHFileReader().getPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index bf95289440f..45fdb3e70a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -569,7 +569,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } // Clear progress away unless invoker has indicated it should be kept. - if (!scannerContext.getKeepProgress()) { + if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) { scannerContext.clearProgress(); } @@ -611,6 +611,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // here, we still need to scan all the qualifiers before returning... scannerContext.returnImmediately(); } + + heap.recordBlockSize(scannerContext::incrementBlockProgress); + prevCell = cell; scannerContext.setLastPeekedCell(cell); topChanged = false; @@ -749,6 +752,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner default: throw new RuntimeException("UNEXPECTED"); } + + // One last chance to break due to size limit. The INCLUDE* cases above already check + // limit and continue. For the various filtered cases, we need to check because block + // size limit may have been exceeded even if we don't add cells to result list. + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + } } while ((cell = this.heap.peek()) != null); if (count > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java index b85c5edb9b6..42f2cf5ebd9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -81,6 +83,39 @@ public class TestHFileReaderImpl { return ncTFile; } + /** + * Test that we only count block size once per block while scanning + */ + @Test + public void testRecordBlockSize() throws IOException { + Path p = makeNewFile(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration conf = TEST_UTIL.getConfiguration(); + HFile.Reader reader = HFile.createReader(fs, p, CacheConfig.DISABLED, true, conf); + + try (HFileReaderImpl.HFileScannerImpl scanner = + (HFileReaderImpl.HFileScannerImpl) reader.getScanner(conf, true, true, false)) { + scanner.seekTo(); + + scanner.recordBlockSize( + size -> assertTrue("expected non-zero block size on first request", size > 0)); + scanner.recordBlockSize( + size -> assertEquals("expected zero block size on second request", 0, (int) size)); + + AtomicInteger blocks = new AtomicInteger(0); + while (scanner.next()) { + scanner.recordBlockSize(size -> { + blocks.incrementAndGet(); + // there's only 2 cells in the second block + assertTrue("expected remaining block to be less than block size", + size < toKV("a").getLength() * 3); + }); + } + + assertEquals("expected only one remaining block but got " + blocks.get(), 1, blocks.get()); + } + } + @Test public void testSeekBefore() throws Exception { Path p = makeNewFile(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java index 373e138a764..b47184390e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.function.IntConsumer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Scan; @@ -113,4 +114,9 @@ public class DelegatingKeyValueScanner implements KeyValueScanner { public Cell getNextIndexedKey() { return delegate.getNextIndexedKey(); } + + @Override + public void recordBlockSize(IntConsumer blockSizeConsumer) { + delegate.recordBlockSize(blockSizeConsumer); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java new file mode 100644 index 00000000000..218db3d618d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnPaginationFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; +import org.apache.hadoop.hbase.filter.SkipFilter; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class }) +public class TestScannerBlockSizeLimits { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestScannerBlockSizeLimits.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final TableName TABLE = TableName.valueOf("TestScannerBlockSizeLimits"); + private static final byte[] FAMILY1 = Bytes.toBytes("0"); + private static final byte[] FAMILY2 = Bytes.toBytes("1"); + + private static final byte[] DATA = new byte[1000]; + private static final byte[][] FAMILIES = new byte[][] { FAMILY1, FAMILY2 }; + + private static final byte[] COLUMN1 = Bytes.toBytes(0); + private static final byte[] COLUMN2 = Bytes.toBytes(1); + private static final byte[] COLUMN3 = Bytes.toBytes(2); + private static final byte[] COLUMN4 = Bytes.toBytes(4); + private static final byte[] COLUMN5 = Bytes.toBytes(5); + + private static final byte[][] COLUMNS = new byte[][] { COLUMN1, COLUMN2 }; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4200); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE, FAMILIES, 1, 2048); + createTestData(); + } + + private static void createTestData() throws IOException, InterruptedException { + RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE); + String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); + HRegion region = TEST_UTIL.getRSForFirstRegionInTable(TABLE).getRegion(regionName); + + for (int i = 1; i < 10; i++) { + // 5 columns per row, in 2 families + // Each column value is 1000 bytes, which is enough to fill a full block with row and header. + // So 5 blocks per row. + Put put = new Put(Bytes.toBytes(i)); + for (int j = 0; j < 6; j++) { + put.addColumn(FAMILY1, Bytes.toBytes(j), DATA); + } + + put.addColumn(FAMILY2, COLUMN1, DATA); + + region.put(put); + + if (i % 2 == 0) { + region.flush(true); + } + } + + // we've created 10 storefiles at this point, 5 per family + region.flush(true); + + } + + /** + * Simplest test that ensures we don't count block sizes too much. These 2 requested cells are in + * the same block, so should be returned in 1 request. If we mis-counted blocks, it'd come in 2 + * requests. + */ + @Test + public void testSingleBlock() throws IOException { + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = + table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1)).withStopRow(Bytes.toBytes(2)) + .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM)); + + ScanMetrics metrics = scanner.getScanMetrics(); + + scanner.next(100); + + assertEquals(1, metrics.countOfRowsScanned.get()); + assertEquals(1, metrics.countOfRPCcalls.get()); + } + + /** + * Tests that we check size limit after filterRowKey. When filterRowKey, we call nextRow to skip + * to next row. This should be efficient in this case, but we still need to check size limits + * after each row is processed. So in this test, we accumulate some block IO reading row 1, then + * skip row 2 and should return early at that point. The next rpc call starts with row3 blocks + * loaded, so can return the whole row in one rpc. If we were not checking size limits, we'd have + * been able to load an extra row 3 cell into the first rpc and thus split row 3 across multiple + * Results. + */ + @Test + public void testCheckLimitAfterFilterRowKey() throws IOException { + + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = table.getScanner(getBaseScan().addColumn(FAMILY1, COLUMN1) + .addColumn(FAMILY1, COLUMN2).addColumn(FAMILY1, COLUMN3).addFamily(FAMILY2) + .setFilter(new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(2))))); + + boolean foundRow3 = false; + for (Result result : scanner) { + Set rows = new HashSet<>(); + for (Cell cell : result.rawCells()) { + rows.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + if (rows.contains(3)) { + assertFalse("expected row3 to come all in one result, but found it in two results", + foundRow3); + assertEquals(1, rows.size()); + foundRow3 = true; + } + } + ScanMetrics metrics = scanner.getScanMetrics(); + + // 4 blocks per row, so 36 blocks. We can scan 3 blocks per RPC, which is 12 RPCs. But we can + // skip 1 row, so skip 2 RPCs. + assertEquals(10, metrics.countOfRPCcalls.get()); + } + + /** + * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to + * filter.filterRow(), nextRow() is called which might accumulate more block IO. Validates that in + * this case we still honor block limits. + */ + @Test + public void testCheckLimitAfterFilteringRowCellsDueToFilterRow() throws IOException { + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true) + .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM) + .setFilter(new SkipFilter(new QualifierFilter(CompareOperator.EQUAL, + new BinaryComparator(Bytes.toBytes("dasfasf")))))); + + // Our filter doesn't end up matching any real columns, so expect only cursors + for (Result result : scanner) { + assertTrue(result.isCursor()); + } + + ScanMetrics metrics = scanner.getScanMetrics(); + + // scanning over 9 rows, filtering on 2 contiguous columns each, so 9 blocks total + // limited to 4200 bytes per which is enough for 3 blocks (exceed limit after loading 3rd) + // so that's 3 RPC and the last RPC pulls the cells loaded by the last block + assertEquals(4, metrics.countOfRPCcalls.get()); + } + + /** + * At the end of the loop in StoreScanner, we do one more check of size limits. This is to catch + * block size being exceeded while filtering cells within a store. Test to ensure that we do that, + * otherwise we'd see no cursors below. + */ + @Test + public void testCheckLimitAfterFilteringCell() throws IOException { + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = table.getScanner(getBaseScan().setCaching(1) + .setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(COLUMN2)))); + + int cursors = 0; + for (Result result : scanner) { + if (result.isCursor()) { + cursors++; + } + } + ScanMetrics metrics = scanner.getScanMetrics(); + + // We will return 9 rows, but also 2 cursors because we exceed the scan size limit partway + // through. So that accounts for 11 rpcs. + assertEquals(2, cursors); + assertEquals(11, metrics.countOfRPCcalls.get()); + } + + /** + * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to + * filter.filterRowCells(), we fall through to a final results.isEmpty() check near the end of the + * method. If results are empty at this point (which they are), nextRow() is called which might + * accumulate more block IO. Validates that in this case we still honor block limits. + */ + @Test + public void testCheckLimitAfterFilteringRowCells() throws IOException { + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = table + .getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true).addColumn(FAMILY1, COLUMN1) + .setReadType(Scan.ReadType.STREAM).setFilter(new SingleColumnValueExcludeFilter(FAMILY1, + COLUMN1, CompareOperator.EQUAL, new BinaryComparator(DATA)))); + + // Since we use SingleColumnValueExcludeFilter and dont include any other columns, the column + // we load to test ends up being excluded from the result. So we only expect cursors here. + for (Result result : scanner) { + assertTrue(result.isCursor()); + } + + ScanMetrics metrics = scanner.getScanMetrics(); + + // Our filter causes us to read the first column of each row, then INCLUDE_AND_SEEK_NEXT_ROW. + // So we load 1 block per row, and there are 9 rows. Our max scan size is large enough to + // return 2 full blocks, with some overflow. So we are able to squeeze this all into 4 RPCs. + assertEquals(4, metrics.countOfRPCcalls.get()); + } + + /** + * Tests that when we seek over blocks we dont include them in the block size of the request + */ + @Test + public void testSeekNextUsingHint() throws IOException { + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = table.getScanner( + getBaseScan().addFamily(FAMILY1).setFilter(new ColumnPaginationFilter(1, COLUMN5))); + + scanner.next(100); + ScanMetrics metrics = scanner.getScanMetrics(); + + // We have to read the first cell/block of each row, then can skip to the last block. So that's + // 2 blocks per row to read (18 total). Our max scan size is enough to read 3 blocks per RPC, + // plus one final RPC to finish region. + assertEquals(7, metrics.countOfRPCcalls.get()); + } + + /** + * We enable cursors and partial results to give us more granularity over counting of results, and + * we enable STREAM so that no auto switching from pread to stream occurs -- this throws off the + * rpc counts. + */ + private Scan getBaseScan() { + return new Scan().setScanMetricsEnabled(true).setNeedCursorResult(true) + .setAllowPartialResults(true).setReadType(Scan.ReadType.STREAM); + } +}