From 39b912aae9ba1b7258d7b6d80e7502e7f9f9eede Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Thu, 18 Jan 2018 11:50:47 +0800 Subject: [PATCH] HBASE-19818 Scan time limit not work if the filter always filter row key --- .../hadoop/hbase/regionserver/HRegion.java | 36 ++++-- .../regionserver/NoLimitScannerContext.java | 12 +- .../hbase/regionserver/ScannerContext.java | 115 ++++++++++++++---- .../hbase/regionserver/StoreScanner.java | 1 - .../hbase/client/TestRawAsyncScanCursor.java | 10 +- .../TestScannerHeartbeatMessages.java | 66 +++++++--- 6 files changed, 170 insertions(+), 70 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c0ccc1dca14..ae0f3d1e3dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6390,7 +6390,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int initialBatchProgress = scannerContext.getBatchProgress(); long initialSizeProgress = scannerContext.getDataSizeProgress(); long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); - long initialTimeProgress = scannerContext.getTimeProgress(); + + // Used to check time limit + LimitScope limitScope = LimitScope.BETWEEN_CELLS; // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. @@ -6403,7 +6405,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (scannerContext.getKeepProgress()) { // Progress should be kept. Reset to initial values seen at start of method invocation. scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialHeapSizeProgress, initialTimeProgress); + initialHeapSizeProgress); } else { scannerContext.clearProgress(); } @@ -6442,6 +6444,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); + limitScope = LimitScope.BETWEEN_ROWS; + } + + if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { + if (hasFilterRow) { + throw new IncompatibleFilterException( + "Filter whose hasFilterRow() returns true is incompatible with scans that must " + + " stop mid-row because of a limit. ScannerContext:" + scannerContext); + } + return true; } // Check if we were getting data from the joinedHeap and hit the limit. @@ -6472,6 +6484,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } results.clear(); + + // Read nothing as the rowkey was filtered, but still need to check time limit + if (scannerContext.checkTimeLimit(limitScope)) { + return true; + } continue; } @@ -6498,16 +6515,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ret = filter.filterRowCellsWithRet(results); // We don't know how the results have changed after being filtered. Must set progress - // according to contents of results now. However, a change in the results should not - // affect the time progress. Thus preserve whatever time progress has been made - long timeProgress = scannerContext.getTimeProgress(); + // according to contents of results now. if (scannerContext.getKeepProgress()) { scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialHeapSizeProgress, initialTimeProgress); + initialHeapSizeProgress); } else { scannerContext.clearProgress(); } - scannerContext.setTimeProgress(timeProgress); scannerContext.incrementBatchProgress(results.size()); for (Cell cell : results) { scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), @@ -6525,7 +6539,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. - if (!shouldStop) continue; + if (!shouldStop) { + // Read nothing as the cells was filtered, but still need to check time limit + if (scannerContext.checkTimeLimit(limitScope)) { + return true; + } + continue; + } return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java index ef1fb68b169..26d819214f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java @@ -68,17 +68,7 @@ public class NoLimitScannerContext extends ScannerContext { } @Override - void setTimeProgress(long timeProgress) { - // Do nothing. NoLimitScannerContext instances are immutable post-construction - } - - @Override - void updateTimeProgress() { - // Do nothing. NoLimitScannerContext instances are immutable post-construction - } - - @Override - void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) { + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) { // Do nothing. NoLimitScannerContext instances are immutable post-construction } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 6b2267f8511..8aafa42acff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -51,11 +51,13 @@ import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; @InterfaceStability.Evolving public class ScannerContext { - /** - * Two sets of the same fields. One for the limits, another for the progress towards those limits - */ LimitFields limits; - LimitFields progress; + /** + * A different set of progress fields. Only include batch, dataSize and heapSize. Compare to + * LimitFields, ProgressFields doesn't contain time field. As we save a deadline in LimitFields, + * so use {@link System#currentTimeMillis()} directly when check time limit. + */ + ProgressFields progress; /** * The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)} @@ -104,10 +106,12 @@ public class ScannerContext { ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) { this.limits = new LimitFields(); - if (limitsToCopy != null) this.limits.copy(limitsToCopy); + if (limitsToCopy != null) { + this.limits.copy(limitsToCopy); + } // Progress fields are initialized to 0 - progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0); + progress = new ProgressFields(0, 0, 0); this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; @@ -160,13 +164,6 @@ public class ScannerContext { progress.setHeapSize(curHeapSize + heapSize); } - /** - * Update the time progress with {@link System#currentTimeMillis()} - */ - void updateTimeProgress() { - progress.setTime(System.currentTimeMillis()); - } - int getBatchProgress() { return progress.getBatch(); } @@ -179,14 +176,9 @@ public class ScannerContext { return progress.getHeapSize(); } - long getTimeProgress() { - return progress.getTime(); - } - - void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) { + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) { setBatchProgress(batchProgress); setSizeProgress(sizeProgress, heapSizeProgress); - setTimeProgress(timeProgress); } void setSizeProgress(long dataSizeProgress, long heapSizeProgress) { @@ -198,16 +190,12 @@ public class ScannerContext { progress.setBatch(batchProgress); } - void setTimeProgress(long timeProgress) { - progress.setTime(timeProgress); - } - /** * Clear away any progress that has been made so far. All progress fields are reset to initial * values */ void clearProgress() { - progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0); + progress.setFields(0, 0, 0); } /** @@ -319,7 +307,7 @@ public class ScannerContext { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkTimeLimit(LimitScope checkerScope) { - return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime(); + return hasTimeLimit(checkerScope) && (System.currentTimeMillis() >= limits.getTime()); } /** @@ -690,4 +678,81 @@ public class ScannerContext { return sb.toString(); } } + + private static class ProgressFields { + + private static int DEFAULT_BATCH = -1; + private static long DEFAULT_SIZE = -1L; + + // The batch limit will always be enforced between cells, thus, there isn't a field to hold the + // batch scope + int batch = DEFAULT_BATCH; + + // The sum of cell data sizes(key + value). The Cell data might be in on heap or off heap area. + long dataSize = DEFAULT_SIZE; + // The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as + // such AND data cells of Cells which are in on heap area. + long heapSize = DEFAULT_SIZE; + + /** + * Fields keep their default values. + */ + ProgressFields() { + } + + ProgressFields(int batch, long size, long heapSize) { + setFields(batch, size, heapSize); + } + + /** + * Set all fields together. + */ + void setFields(int batch, long dataSize, long heapSize) { + setBatch(batch); + setDataSize(dataSize); + setHeapSize(heapSize); + } + + int getBatch() { + return this.batch; + } + + void setBatch(int batch) { + this.batch = batch; + } + + long getDataSize() { + return this.dataSize; + } + + long getHeapSize() { + return this.heapSize; + } + + void setDataSize(long dataSize) { + this.dataSize = dataSize; + } + + void setHeapSize(long heapSize) { + this.heapSize = heapSize; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + + sb.append("batch:"); + sb.append(batch); + + sb.append(", dataSize:"); + sb.append(dataSize); + + sb.append(", heapSize:"); + sb.append(heapSize); + + sb.append("}"); + return sb.toString(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7e682f93e45..0b9b547b8c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -552,7 +552,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner LOOP: do { // Update and check the time limit based on the configured value of cellsPerTimeoutCheck if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { - scannerContext.updateTimeProgress(); if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java index 0624a30e36a..04ffd7b8fb3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -79,12 +80,9 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor { assertEquals(1, results.length); assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); // we will always provide a scan cursor if time limit is reached. - if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) { - assertFalse(controller.cursor().isPresent()); - } else { - assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], - controller.cursor().get().getRow()); - } + assertTrue(controller.cursor().isPresent()); + assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], + controller.cursor().get().getRow()); assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); count++; } catch (Throwable e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index fb02d9d9300..11cc365a766 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -144,12 +144,6 @@ public class TestScannerHeartbeatMessages { /** * Make puts to put the input value into each combination of row, family, and qualifier - * @param rows - * @param families - * @param qualifiers - * @param value - * @return - * @throws IOException */ static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, byte[] value) throws IOException { @@ -189,8 +183,6 @@ public class TestScannerHeartbeatMessages { * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are * disabled, the test should throw an exception. - * @param testCallable - * @throws InterruptedException */ private void testImportanceOfHeartbeats(Callable testCallable) throws InterruptedException { HeartbeatRPCServices.heartbeatsEnabled = true; @@ -217,7 +209,6 @@ public class TestScannerHeartbeatMessages { /** * Test the case that the time limit for the scan is reached after each full row of cells is * fetched. - * @throws Exception */ @Test public void testHeartbeatBetweenRows() throws Exception { @@ -239,7 +230,6 @@ public class TestScannerHeartbeatMessages { /** * Test the case that the time limit for scans is reached in between column families - * @throws Exception */ @Test public void testHeartbeatBetweenColumnFamilies() throws Exception { @@ -263,7 +253,7 @@ public class TestScannerHeartbeatMessages { }); } - public static class SparseFilter extends FilterBase { + public static class SparseCellFilter extends FilterBase { @Override public ReturnCode filterCell(final Cell v) throws IOException { @@ -277,23 +267,39 @@ public class TestScannerHeartbeatMessages { } public static Filter parseFrom(final byte[] pbBytes) { - return new SparseFilter(); + return new SparseCellFilter(); + } + } + + public static class SparseRowFilter extends FilterBase { + + @Override + public boolean filterRowKey(Cell cell) throws IOException { + try { + Thread.sleep(CLIENT_TIMEOUT / 2 - 100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]); + } + + public static Filter parseFrom(final byte[] pbBytes) { + return new SparseRowFilter(); } } /** * Test the case that there is a filter which filters most of cells - * @throws Exception */ @Test - public void testHeartbeatWithSparseFilter() throws Exception { + public void testHeartbeatWithSparseCellFilter() throws Exception { testImportanceOfHeartbeats(new Callable() { @Override public Void call() throws Exception { Scan scan = new Scan(); scan.setMaxResultSize(Long.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE); - scan.setFilter(new SparseFilter()); + scan.setFilter(new SparseCellFilter()); ResultScanner scanner = TABLE.getScanner(scan); int num = 0; while (scanner.next() != null) { @@ -305,7 +311,7 @@ public class TestScannerHeartbeatMessages { scan = new Scan(); scan.setMaxResultSize(Long.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE); - scan.setFilter(new SparseFilter()); + scan.setFilter(new SparseCellFilter()); scan.setAllowPartialResults(true); scanner = TABLE.getScanner(scan); num = 0; @@ -320,6 +326,31 @@ public class TestScannerHeartbeatMessages { }); } + /** + * Test the case that there is a filter which filters most of rows + */ + @Test + public void testHeartbeatWithSparseRowFilter() throws Exception { + testImportanceOfHeartbeats(new Callable() { + @Override + public Void call() throws Exception { + Scan scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + scan.setFilter(new SparseRowFilter()); + ResultScanner scanner = TABLE.getScanner(scan); + int num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(1, num); + scanner.close(); + + return null; + } + }); + } + /** * Test the equivalence of a scan versus the same scan executed when heartbeat messages are * necessary @@ -328,7 +359,6 @@ public class TestScannerHeartbeatMessages { * @param cfSleepTime The time to sleep between fetches of column family cells * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for * that column family are fetched - * @throws Exception */ private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) throws Exception { @@ -361,8 +391,6 @@ public class TestScannerHeartbeatMessages { /** * Helper method for setting the time to sleep between rows and column families. If a sleep time * is negative then that sleep will be disabled - * @param rowSleepTime - * @param cfSleepTime */ private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;