HBASE-19818 Scan time limit not work if the filter always filter row key

This commit is contained in:
Guanghao Zhang 2018-01-18 11:50:47 +08:00
parent 01c34243fe
commit 39b912aae9
6 changed files with 170 additions and 70 deletions

View File

@ -6390,7 +6390,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int initialBatchProgress = scannerContext.getBatchProgress(); int initialBatchProgress = scannerContext.getBatchProgress();
long initialSizeProgress = scannerContext.getDataSizeProgress(); long initialSizeProgress = scannerContext.getDataSizeProgress();
long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
long initialTimeProgress = scannerContext.getTimeProgress();
// Used to check time limit
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
// The loop here is used only when at some point during the next we determine // The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result. // that due to effects of filters or otherwise, we have an empty row in the result.
@ -6403,7 +6405,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (scannerContext.getKeepProgress()) { if (scannerContext.getKeepProgress()) {
// Progress should be kept. Reset to initial values seen at start of method invocation. // Progress should be kept. Reset to initial values seen at start of method invocation.
scannerContext.setProgress(initialBatchProgress, initialSizeProgress, scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialHeapSizeProgress, initialTimeProgress); initialHeapSizeProgress);
} else { } else {
scannerContext.clearProgress(); scannerContext.clearProgress();
} }
@ -6442,6 +6444,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
limitScope = LimitScope.BETWEEN_ROWS;
}
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
if (hasFilterRow) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scans that must " +
" stop mid-row because of a limit. ScannerContext:" + scannerContext);
}
return true;
} }
// Check if we were getting data from the joinedHeap and hit the limit. // Check if we were getting data from the joinedHeap and hit the limit.
@ -6472,6 +6484,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
results.clear(); results.clear();
// Read nothing as the rowkey was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
return true;
}
continue; continue;
} }
@ -6498,16 +6515,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ret = filter.filterRowCellsWithRet(results); ret = filter.filterRowCellsWithRet(results);
// We don't know how the results have changed after being filtered. Must set progress // We don't know how the results have changed after being filtered. Must set progress
// according to contents of results now. However, a change in the results should not // according to contents of results now.
// affect the time progress. Thus preserve whatever time progress has been made
long timeProgress = scannerContext.getTimeProgress();
if (scannerContext.getKeepProgress()) { if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress, scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialHeapSizeProgress, initialTimeProgress); initialHeapSizeProgress);
} else { } else {
scannerContext.clearProgress(); scannerContext.clearProgress();
} }
scannerContext.setTimeProgress(timeProgress);
scannerContext.incrementBatchProgress(results.size()); scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) { for (Cell cell : results) {
scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
@ -6525,7 +6539,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This row was totally filtered out, if this is NOT the last row, // This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do. // we should continue on. Otherwise, nothing else to do.
if (!shouldStop) continue; if (!shouldStop) {
// Read nothing as the cells was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
return true;
}
continue;
}
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }

View File

@ -68,17 +68,7 @@ public class NoLimitScannerContext extends ScannerContext {
} }
@Override @Override
void setTimeProgress(long timeProgress) { void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@Override
void updateTimeProgress() {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@Override
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction // Do nothing. NoLimitScannerContext instances are immutable post-construction
} }

View File

@ -51,11 +51,13 @@ import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ScannerContext { public class ScannerContext {
/**
* Two sets of the same fields. One for the limits, another for the progress towards those limits
*/
LimitFields limits; LimitFields limits;
LimitFields progress; /**
* A different set of progress fields. Only include batch, dataSize and heapSize. Compare to
* LimitFields, ProgressFields doesn't contain time field. As we save a deadline in LimitFields,
* so use {@link System#currentTimeMillis()} directly when check time limit.
*/
ProgressFields progress;
/** /**
* The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)} * The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)}
@ -104,10 +106,12 @@ public class ScannerContext {
ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) { ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
this.limits = new LimitFields(); this.limits = new LimitFields();
if (limitsToCopy != null) this.limits.copy(limitsToCopy); if (limitsToCopy != null) {
this.limits.copy(limitsToCopy);
}
// Progress fields are initialized to 0 // Progress fields are initialized to 0
progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0); progress = new ProgressFields(0, 0, 0);
this.keepProgress = keepProgress; this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE; this.scannerState = DEFAULT_STATE;
@ -160,13 +164,6 @@ public class ScannerContext {
progress.setHeapSize(curHeapSize + heapSize); progress.setHeapSize(curHeapSize + heapSize);
} }
/**
* Update the time progress with {@link System#currentTimeMillis()}
*/
void updateTimeProgress() {
progress.setTime(System.currentTimeMillis());
}
int getBatchProgress() { int getBatchProgress() {
return progress.getBatch(); return progress.getBatch();
} }
@ -179,14 +176,9 @@ public class ScannerContext {
return progress.getHeapSize(); return progress.getHeapSize();
} }
long getTimeProgress() { void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) {
return progress.getTime();
}
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
setBatchProgress(batchProgress); setBatchProgress(batchProgress);
setSizeProgress(sizeProgress, heapSizeProgress); setSizeProgress(sizeProgress, heapSizeProgress);
setTimeProgress(timeProgress);
} }
void setSizeProgress(long dataSizeProgress, long heapSizeProgress) { void setSizeProgress(long dataSizeProgress, long heapSizeProgress) {
@ -198,16 +190,12 @@ public class ScannerContext {
progress.setBatch(batchProgress); progress.setBatch(batchProgress);
} }
void setTimeProgress(long timeProgress) {
progress.setTime(timeProgress);
}
/** /**
* Clear away any progress that has been made so far. All progress fields are reset to initial * Clear away any progress that has been made so far. All progress fields are reset to initial
* values * values
*/ */
void clearProgress() { void clearProgress() {
progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0); progress.setFields(0, 0, 0);
} }
/** /**
@ -319,7 +307,7 @@ public class ScannerContext {
* @return true when the limit is enforceable from the checker's scope and it has been reached * @return true when the limit is enforceable from the checker's scope and it has been reached
*/ */
boolean checkTimeLimit(LimitScope checkerScope) { boolean checkTimeLimit(LimitScope checkerScope) {
return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime(); return hasTimeLimit(checkerScope) && (System.currentTimeMillis() >= limits.getTime());
} }
/** /**
@ -690,4 +678,81 @@ public class ScannerContext {
return sb.toString(); return sb.toString();
} }
} }
private static class ProgressFields {
private static int DEFAULT_BATCH = -1;
private static long DEFAULT_SIZE = -1L;
// The batch limit will always be enforced between cells, thus, there isn't a field to hold the
// batch scope
int batch = DEFAULT_BATCH;
// The sum of cell data sizes(key + value). The Cell data might be in on heap or off heap area.
long dataSize = DEFAULT_SIZE;
// The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as
// such AND data cells of Cells which are in on heap area.
long heapSize = DEFAULT_SIZE;
/**
* Fields keep their default values.
*/
ProgressFields() {
}
ProgressFields(int batch, long size, long heapSize) {
setFields(batch, size, heapSize);
}
/**
* Set all fields together.
*/
void setFields(int batch, long dataSize, long heapSize) {
setBatch(batch);
setDataSize(dataSize);
setHeapSize(heapSize);
}
int getBatch() {
return this.batch;
}
void setBatch(int batch) {
this.batch = batch;
}
long getDataSize() {
return this.dataSize;
}
long getHeapSize() {
return this.heapSize;
}
void setDataSize(long dataSize) {
this.dataSize = dataSize;
}
void setHeapSize(long heapSize) {
this.heapSize = heapSize;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("batch:");
sb.append(batch);
sb.append(", dataSize:");
sb.append(dataSize);
sb.append(", heapSize:");
sb.append(heapSize);
sb.append("}");
return sb.toString();
}
}
} }

View File

@ -552,7 +552,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
LOOP: do { LOOP: do {
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -79,12 +80,9 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
assertEquals(1, results.length); assertEquals(1, results.length);
assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
// we will always provide a scan cursor if time limit is reached. // we will always provide a scan cursor if time limit is reached.
if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) { assertTrue(controller.cursor().isPresent());
assertFalse(controller.cursor().isPresent());
} else {
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
controller.cursor().get().getRow()); controller.cursor().get().getRow());
}
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
count++; count++;
} catch (Throwable e) { } catch (Throwable e) {

View File

@ -144,12 +144,6 @@ public class TestScannerHeartbeatMessages {
/** /**
* Make puts to put the input value into each combination of row, family, and qualifier * Make puts to put the input value into each combination of row, family, and qualifier
* @param rows
* @param families
* @param qualifiers
* @param value
* @return
* @throws IOException
*/ */
static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[] value) throws IOException { byte[] value) throws IOException {
@ -189,8 +183,6 @@ public class TestScannerHeartbeatMessages {
* Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
* when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
* disabled, the test should throw an exception. * disabled, the test should throw an exception.
* @param testCallable
* @throws InterruptedException
*/ */
private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException { private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
HeartbeatRPCServices.heartbeatsEnabled = true; HeartbeatRPCServices.heartbeatsEnabled = true;
@ -217,7 +209,6 @@ public class TestScannerHeartbeatMessages {
/** /**
* Test the case that the time limit for the scan is reached after each full row of cells is * Test the case that the time limit for the scan is reached after each full row of cells is
* fetched. * fetched.
* @throws Exception
*/ */
@Test @Test
public void testHeartbeatBetweenRows() throws Exception { public void testHeartbeatBetweenRows() throws Exception {
@ -239,7 +230,6 @@ public class TestScannerHeartbeatMessages {
/** /**
* Test the case that the time limit for scans is reached in between column families * Test the case that the time limit for scans is reached in between column families
* @throws Exception
*/ */
@Test @Test
public void testHeartbeatBetweenColumnFamilies() throws Exception { public void testHeartbeatBetweenColumnFamilies() throws Exception {
@ -263,7 +253,7 @@ public class TestScannerHeartbeatMessages {
}); });
} }
public static class SparseFilter extends FilterBase { public static class SparseCellFilter extends FilterBase {
@Override @Override
public ReturnCode filterCell(final Cell v) throws IOException { public ReturnCode filterCell(final Cell v) throws IOException {
@ -277,23 +267,39 @@ public class TestScannerHeartbeatMessages {
} }
public static Filter parseFrom(final byte[] pbBytes) { public static Filter parseFrom(final byte[] pbBytes) {
return new SparseFilter(); return new SparseCellFilter();
}
}
public static class SparseRowFilter extends FilterBase {
@Override
public boolean filterRowKey(Cell cell) throws IOException {
try {
Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
}
public static Filter parseFrom(final byte[] pbBytes) {
return new SparseRowFilter();
} }
} }
/** /**
* Test the case that there is a filter which filters most of cells * Test the case that there is a filter which filters most of cells
* @throws Exception
*/ */
@Test @Test
public void testHeartbeatWithSparseFilter() throws Exception { public void testHeartbeatWithSparseCellFilter() throws Exception {
testImportanceOfHeartbeats(new Callable<Void>() { testImportanceOfHeartbeats(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE); scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseFilter()); scan.setFilter(new SparseCellFilter());
ResultScanner scanner = TABLE.getScanner(scan); ResultScanner scanner = TABLE.getScanner(scan);
int num = 0; int num = 0;
while (scanner.next() != null) { while (scanner.next() != null) {
@ -305,7 +311,7 @@ public class TestScannerHeartbeatMessages {
scan = new Scan(); scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE); scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseFilter()); scan.setFilter(new SparseCellFilter());
scan.setAllowPartialResults(true); scan.setAllowPartialResults(true);
scanner = TABLE.getScanner(scan); scanner = TABLE.getScanner(scan);
num = 0; num = 0;
@ -320,6 +326,31 @@ public class TestScannerHeartbeatMessages {
}); });
} }
/**
* Test the case that there is a filter which filters most of rows
*/
@Test
public void testHeartbeatWithSparseRowFilter() throws Exception {
testImportanceOfHeartbeats(new Callable<Void>() {
@Override
public Void call() throws Exception {
Scan scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseRowFilter());
ResultScanner scanner = TABLE.getScanner(scan);
int num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(1, num);
scanner.close();
return null;
}
});
}
/** /**
* Test the equivalence of a scan versus the same scan executed when heartbeat messages are * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
* necessary * necessary
@ -328,7 +359,6 @@ public class TestScannerHeartbeatMessages {
* @param cfSleepTime The time to sleep between fetches of column family cells * @param cfSleepTime The time to sleep between fetches of column family cells
* @param sleepBeforeCf set to true when column family sleeps should occur before the cells for * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
* that column family are fetched * that column family are fetched
* @throws Exception
*/ */
private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
int cfSleepTime, boolean sleepBeforeCf) throws Exception { int cfSleepTime, boolean sleepBeforeCf) throws Exception {
@ -361,8 +391,6 @@ public class TestScannerHeartbeatMessages {
/** /**
* Helper method for setting the time to sleep between rows and column families. If a sleep time * Helper method for setting the time to sleep between rows and column families. If a sleep time
* is negative then that sleep will be disabled * is negative then that sleep will be disabled
* @param rowSleepTime
* @param cfSleepTime
*/ */
private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;