HBASE-19818 Scan time limit not work if the filter always filter row key

This commit is contained in:
Guanghao Zhang 2018-01-25 18:16:23 +08:00
parent c16dae13f2
commit c88e570dc1
6 changed files with 188 additions and 57 deletions

View File

@ -6390,7 +6390,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int initialBatchProgress = scannerContext.getBatchProgress();
long initialSizeProgress = scannerContext.getDataSizeProgress();
long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
long initialTimeProgress = scannerContext.getTimeProgress();
// Used to check time limit
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
@ -6403,7 +6405,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (scannerContext.getKeepProgress()) {
// Progress should be kept. Reset to initial values seen at start of method invocation.
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialHeapSizeProgress, initialTimeProgress);
initialHeapSizeProgress);
} else {
scannerContext.clearProgress();
}
@ -6442,6 +6444,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
limitScope = LimitScope.BETWEEN_ROWS;
}
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
if (hasFilterRow) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scans that must " +
" stop mid-row because of a limit. ScannerContext:" + scannerContext);
}
return true;
}
// Check if we were getting data from the joinedHeap and hit the limit.
@ -6472,6 +6484,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
// Read nothing as the rowkey was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
return true;
}
continue;
}
@ -6498,16 +6515,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ret = filter.filterRowCellsWithRet(results);
// We don't know how the results have changed after being filtered. Must set progress
// according to contents of results now. However, a change in the results should not
// affect the time progress. Thus preserve whatever time progress has been made
long timeProgress = scannerContext.getTimeProgress();
// according to contents of results now.
if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialHeapSizeProgress, initialTimeProgress);
initialHeapSizeProgress);
} else {
scannerContext.clearProgress();
}
scannerContext.setTimeProgress(timeProgress);
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
@ -6525,7 +6539,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!shouldStop) continue;
if (!shouldStop) {
// Read nothing as the cells was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
return true;
}
continue;
}
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}

View File

@ -68,17 +68,7 @@ public class NoLimitScannerContext extends ScannerContext {
}
@Override
void setTimeProgress(long timeProgress) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@Override
void updateTimeProgress() {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@Override
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}

View File

@ -51,11 +51,13 @@ import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
@InterfaceStability.Evolving
public class ScannerContext {
/**
* Two sets of the same fields. One for the limits, another for the progress towards those limits
*/
LimitFields limits;
LimitFields progress;
/**
* A different set of progress fields. Only include batch, dataSize and heapSize. Compare to
* LimitFields, ProgressFields doesn't contain time field. As we save a deadline in LimitFields,
* so use {@link System#currentTimeMillis()} directly when check time limit.
*/
ProgressFields progress;
/**
* The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)}
@ -104,10 +106,12 @@ public class ScannerContext {
ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
this.limits = new LimitFields();
if (limitsToCopy != null) this.limits.copy(limitsToCopy);
if (limitsToCopy != null) {
this.limits.copy(limitsToCopy);
}
// Progress fields are initialized to 0
progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0);
progress = new ProgressFields(0, 0, 0);
this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE;
@ -162,9 +166,10 @@ public class ScannerContext {
/**
* Update the time progress with {@link System#currentTimeMillis()}
* @deprecated will be removed in 3.0
*/
@Deprecated
void updateTimeProgress() {
progress.setTime(System.currentTimeMillis());
}
int getBatchProgress() {
@ -179,14 +184,25 @@ public class ScannerContext {
return progress.getHeapSize();
}
/**
* @deprecated will be removed in 3.0
*/
@Deprecated
long getTimeProgress() {
return progress.getTime();
return System.currentTimeMillis();
}
/**
* @deprecated will be removed in 3.0
*/
@Deprecated
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
setProgress(batchProgress, sizeProgress, heapSizeProgress);
}
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) {
setBatchProgress(batchProgress);
setSizeProgress(sizeProgress, heapSizeProgress);
setTimeProgress(timeProgress);
}
void setSizeProgress(long dataSizeProgress, long heapSizeProgress) {
@ -198,8 +214,11 @@ public class ScannerContext {
progress.setBatch(batchProgress);
}
/**
* @deprecated will be removed in 3.0
*/
@Deprecated
void setTimeProgress(long timeProgress) {
progress.setTime(timeProgress);
}
/**
@ -207,7 +226,7 @@ public class ScannerContext {
* values
*/
void clearProgress() {
progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0);
progress.setFields(0, 0, 0);
}
/**
@ -319,7 +338,7 @@ public class ScannerContext {
* @return true when the limit is enforceable from the checker's scope and it has been reached
*/
boolean checkTimeLimit(LimitScope checkerScope) {
return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime();
return hasTimeLimit(checkerScope) && (System.currentTimeMillis() >= limits.getTime());
}
/**
@ -690,4 +709,81 @@ public class ScannerContext {
return sb.toString();
}
}
private static class ProgressFields {
private static int DEFAULT_BATCH = -1;
private static long DEFAULT_SIZE = -1L;
// The batch limit will always be enforced between cells, thus, there isn't a field to hold the
// batch scope
int batch = DEFAULT_BATCH;
// The sum of cell data sizes(key + value). The Cell data might be in on heap or off heap area.
long dataSize = DEFAULT_SIZE;
// The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as
// such AND data cells of Cells which are in on heap area.
long heapSize = DEFAULT_SIZE;
/**
* Fields keep their default values.
*/
ProgressFields() {
}
ProgressFields(int batch, long size, long heapSize) {
setFields(batch, size, heapSize);
}
/**
* Set all fields together.
*/
void setFields(int batch, long dataSize, long heapSize) {
setBatch(batch);
setDataSize(dataSize);
setHeapSize(heapSize);
}
int getBatch() {
return this.batch;
}
void setBatch(int batch) {
this.batch = batch;
}
long getDataSize() {
return this.dataSize;
}
long getHeapSize() {
return this.heapSize;
}
void setDataSize(long dataSize) {
this.dataSize = dataSize;
}
void setHeapSize(long heapSize) {
this.heapSize = heapSize;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("batch:");
sb.append(batch);
sb.append(", dataSize:");
sb.append(dataSize);
sb.append(", heapSize:");
sb.append(heapSize);
sb.append("}");
return sb.toString();
}
}
}

View File

@ -552,7 +552,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
LOOP: do {
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@ -79,12 +80,9 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
assertEquals(1, results.length);
assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
// we will always provide a scan cursor if time limit is reached.
if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) {
assertFalse(controller.cursor().isPresent());
} else {
assertTrue(controller.cursor().isPresent());
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
controller.cursor().get().getRow());
}
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
count++;
} catch (Throwable e) {

View File

@ -144,12 +144,6 @@ public class TestScannerHeartbeatMessages {
/**
* Make puts to put the input value into each combination of row, family, and qualifier
* @param rows
* @param families
* @param qualifiers
* @param value
* @return
* @throws IOException
*/
static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[] value) throws IOException {
@ -189,8 +183,6 @@ public class TestScannerHeartbeatMessages {
* Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
* when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
* disabled, the test should throw an exception.
* @param testCallable
* @throws InterruptedException
*/
private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
HeartbeatRPCServices.heartbeatsEnabled = true;
@ -217,7 +209,6 @@ public class TestScannerHeartbeatMessages {
/**
* Test the case that the time limit for the scan is reached after each full row of cells is
* fetched.
* @throws Exception
*/
@Test
public void testHeartbeatBetweenRows() throws Exception {
@ -239,7 +230,6 @@ public class TestScannerHeartbeatMessages {
/**
* Test the case that the time limit for scans is reached in between column families
* @throws Exception
*/
@Test
public void testHeartbeatBetweenColumnFamilies() throws Exception {
@ -263,7 +253,7 @@ public class TestScannerHeartbeatMessages {
});
}
public static class SparseFilter extends FilterBase {
public static class SparseCellFilter extends FilterBase {
@Override
public ReturnCode filterCell(final Cell v) throws IOException {
@ -277,23 +267,39 @@ public class TestScannerHeartbeatMessages {
}
public static Filter parseFrom(final byte[] pbBytes) {
return new SparseFilter();
return new SparseCellFilter();
}
}
public static class SparseRowFilter extends FilterBase {
@Override
public boolean filterRowKey(Cell cell) throws IOException {
try {
Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
}
public static Filter parseFrom(final byte[] pbBytes) {
return new SparseRowFilter();
}
}
/**
* Test the case that there is a filter which filters most of cells
* @throws Exception
*/
@Test
public void testHeartbeatWithSparseFilter() throws Exception {
public void testHeartbeatWithSparseCellFilter() throws Exception {
testImportanceOfHeartbeats(new Callable<Void>() {
@Override
public Void call() throws Exception {
Scan scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseFilter());
scan.setFilter(new SparseCellFilter());
ResultScanner scanner = TABLE.getScanner(scan);
int num = 0;
while (scanner.next() != null) {
@ -305,7 +311,7 @@ public class TestScannerHeartbeatMessages {
scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseFilter());
scan.setFilter(new SparseCellFilter());
scan.setAllowPartialResults(true);
scanner = TABLE.getScanner(scan);
num = 0;
@ -320,6 +326,31 @@ public class TestScannerHeartbeatMessages {
});
}
/**
* Test the case that there is a filter which filters most of rows
*/
@Test
public void testHeartbeatWithSparseRowFilter() throws Exception {
testImportanceOfHeartbeats(new Callable<Void>() {
@Override
public Void call() throws Exception {
Scan scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseRowFilter());
ResultScanner scanner = TABLE.getScanner(scan);
int num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(1, num);
scanner.close();
return null;
}
});
}
/**
* Test the equivalence of a scan versus the same scan executed when heartbeat messages are
* necessary
@ -328,7 +359,6 @@ public class TestScannerHeartbeatMessages {
* @param cfSleepTime The time to sleep between fetches of column family cells
* @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
* that column family are fetched
* @throws Exception
*/
private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
int cfSleepTime, boolean sleepBeforeCf) throws Exception {
@ -361,8 +391,6 @@ public class TestScannerHeartbeatMessages {
/**
* Helper method for setting the time to sleep between rows and column families. If a sleep time
* is negative then that sleep will be disabled
* @param rowSleepTime
* @param cfSleepTime
*/
private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;