HBASE-17958 Avoid passing unexpected cell to ScanQueryMatcher when optimize SEEK to SKIP

This commit is contained in:
Guanghao Zhang 2017-04-27 14:07:21 +08:00
parent 4dbd025ccf
commit 91995749c2
5 changed files with 212 additions and 84 deletions

View File

@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -572,7 +571,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
checkScanOrder(prevCell, cell, comparator);
prevCell = cell;
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
qcode = optimize(qcode, cell);
switch (qcode) {
case INCLUDE:
case INCLUDE_AND_SEEK_NEXT_ROW:
@ -621,9 +619,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
matcher.clearCurrentRow();
seekToNextRow(cell);
seekOrSkipToNextRow(cell);
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
seekAsDirection(matcher.getKeyForNextColumn(cell));
seekOrSkipToNextColumn(cell);
} else {
this.heap.next();
}
@ -658,11 +656,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
matcher.clearCurrentRow();
seekToNextRow(cell);
seekOrSkipToNextRow(cell);
break;
case SEEK_NEXT_COL:
seekAsDirection(matcher.getKeyForNextColumn(cell));
seekOrSkipToNextColumn(cell);
break;
case SKIP:
@ -692,35 +690,47 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
private void seekOrSkipToNextRow(Cell cell) throws IOException {
// If it is a Get Scan, then we know that we are done with this row; there are no more
// rows beyond the current one: don't try to optimize.
if (!get) {
if (trySkipToNextRow(cell)) {
return;
}
}
seekToNextRow(cell);
}
private void seekOrSkipToNextColumn(Cell cell) throws IOException {
if (!trySkipToNextColumn(cell)) {
seekAsDirection(matcher.getKeyForNextColumn(cell));
}
}
/**
* See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
* This method works together with ColumnTrackers and Filters. ColumnTrackers may issue SEEK
* hints, such as seek to next column, next row, or seek to an arbitrary seek key.
* This method intercepts these qcodes and decides whether a seek is the most efficient _actual_
* way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
* current, loaded block).
* ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row,
* or seek to an arbitrary seek key. This method decides whether a seek is the most efficient
* _actual_ way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP,
* SKIP inside the current, loaded block).
* It does this by looking at the next indexed key of the current HFile. This key
* is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key
* on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with
* the current Cell but compare as though it were a seek key; see down in
* matcher.compareKeyForNextRow, etc). If the compare gets us onto the
* next block we *_SEEK, otherwise we just INCLUDE or SKIP, and let the ColumnTrackers or Filters
* go through the next Cell, and so on)
*
* <p>The ColumnTrackers and Filters must behave correctly in all cases, i.e. if they are past the
* Cells they care about they must issues a SKIP or SEEK.
* next block we *_SEEK, otherwise we just SKIP to the next requested cell.
*
* <p>Other notes:
* <ul>
* <li>Rows can straddle block boundaries</li>
* <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
* different block than column C1 at T2)</li>
* <li>We want to SKIP and INCLUDE if the chance is high that we'll find the desired Cell after a
* <li>We want to SKIP if the chance is high that we'll find the desired Cell after a
* few SKIPs...</li>
* <li>We want to INCLUDE_AND_SEEK and SEEK when the chance is high that we'll be able to seek
* <li>We want to SEEK when the chance is high that we'll be able to seek
* past many Cells, especially if we know we need to go to the next block.</li>
* </ul>
* <p>A good proxy (best effort) to determine whether INCLUDE/SKIP is better than SEEK is whether
* <p>A good proxy (best effort) to determine whether SKIP is better than SEEK is whether
* we'll likely end up seeking to the next block (or past the next block) to get our next column.
* Example:
* <pre>
@ -743,40 +753,44 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
* where the SEEK will not land us in the next block, it is very likely better to issues a series
* of SKIPs.
* @param cell current cell
* @return true means skip to next row, false means not
*/
@VisibleForTesting
protected ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
switch(qcode) {
case INCLUDE_AND_SEEK_NEXT_COL:
case SEEK_NEXT_COL:
{
protected boolean trySkipToNextRow(Cell cell) throws IOException {
Cell nextCell = null;
do {
Cell nextIndexedKey = getNextIndexedKey();
if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
&& matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
this.heap.next();
++kvsScanned;
} else {
return false;
}
} while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
return true;
}
/**
* See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)}
* @param cell current cell
* @return true means skip to next column, false means not
*/
@VisibleForTesting
protected boolean trySkipToNextColumn(Cell cell) throws IOException {
Cell nextCell = null;
do {
Cell nextIndexedKey = getNextIndexedKey();
if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
&& matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
this.heap.next();
++kvsScanned;
} else {
return false;
}
break;
}
case INCLUDE_AND_SEEK_NEXT_ROW:
case SEEK_NEXT_ROW:
{
// If it is a Get Scan, then we know that we are done with this row; there are no more
// rows beyond the current one: don't try to optimize. We are DONE. Return the *_NEXT_ROW
// qcode as is. When the caller gets these flags on a Get Scan, it knows it can shut down the
// Scan.
if (!this.scan.isGetScan()) {
Cell nextIndexedKey = getNextIndexedKey();
if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
&& matcher.compareKeyForNextRow(nextIndexedKey, cell) > 0) {
return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
}
}
break;
}
default:
break;
}
return qcode;
} while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
return true;
}
// Implementation of ChangedReadersObserver

View File

@ -254,7 +254,6 @@ public class LegacyScanQueryMatcher extends ScanQueryMatcher {
case NEXT_COL:
return columns.getNextRowOrNextColumn(cell);
case NEXT_ROW:
stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW;
case SEEK_NEXT_USING_HINT:
return MatchCode.SEEK_NEXT_USING_HINT;
@ -284,12 +283,6 @@ public class LegacyScanQueryMatcher extends ScanQueryMatcher {
*/
colChecker = columns.checkVersions(cell, timestamp, typeByte,
mvccVersion > maxReadPointToTrackVersions);
//Optimize with stickyNextRow
boolean seekNextRowFromEssential = filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW &&
filter.isFamilyEssential(cell.getFamilyArray());
if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW || seekNextRowFromEssential) {
stickyNextRow = true;
}
if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (colChecker != MatchCode.SKIP) {
return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
@ -300,8 +293,6 @@ public class LegacyScanQueryMatcher extends ScanQueryMatcher {
colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL
: colChecker;
}
stickyNextRow = (colChecker == MatchCode.SEEK_NEXT_ROW) ? true
: stickyNextRow;
return colChecker;
}

View File

@ -125,8 +125,6 @@ public abstract class ScanQueryMatcher implements ShipperListener {
/** Row the query is on */
protected Cell currentRow;
protected boolean stickyNextRow;
protected ScanQueryMatcher(Cell startKey, ScanInfo scanInfo, ColumnTracker columns,
long oldestUnexpiredTS, long now) {
this.rowComparator = scanInfo.getComparator();
@ -180,13 +178,8 @@ public abstract class ScanQueryMatcher implements ShipperListener {
if (rowComparator.compareRows(currentRow, cell) != 0) {
return MatchCode.DONE;
}
// optimize case.
if (this.stickyNextRow) {
return MatchCode.SEEK_NEXT_ROW;
}
if (this.columns.done()) {
stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW;
}
@ -275,7 +268,6 @@ public abstract class ScanQueryMatcher implements ShipperListener {
this.currentRow = currentRow;
columns.reset();
reset();
stickyNextRow = false;
}
public abstract boolean isUserScan();

View File

@ -109,9 +109,6 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
// STEP 1: Check if the column is part of the requested columns
MatchCode colChecker = columns.checkColumn(cell, typeByte);
if (colChecker != MatchCode.INCLUDE) {
if (colChecker == MatchCode.SEEK_NEXT_ROW) {
stickyNextRow = true;
}
return colChecker;
}
ReturnCode filterResponse = ReturnCode.SKIP;
@ -125,7 +122,6 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
case NEXT_COL:
return columns.getNextRowOrNextColumn(cell);
case NEXT_ROW:
stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW;
case SEEK_NEXT_USING_HINT:
return MatchCode.SEEK_NEXT_USING_HINT;
@ -154,12 +150,6 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
* FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
*/
colChecker = columns.checkVersions(cell, timestamp, typeByte, false);
// Optimize with stickyNextRow
boolean seekNextRowFromEssential = filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW
&& filter.isFamilyEssential(cell.getFamilyArray());
if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW || seekNextRowFromEssential) {
stickyNextRow = true;
}
if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (colChecker != MatchCode.SKIP) {
return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -127,13 +127,30 @@ public class TestStoreScanner {
CellUtil.createCell(FIVE, CF, ZERO, 1L, KeyValue.Type.Put.getCode(), VALUE),
};
private static class KeyValueHeapWithCount extends KeyValueHeap {
final AtomicInteger count;
public KeyValueHeapWithCount(List<? extends KeyValueScanner> scanners,
CellComparator comparator, AtomicInteger count) throws IOException {
super(scanners, comparator);
this.count = count;
}
@Override
public Cell peek() {
this.count.incrementAndGet();
return super.peek();
}
}
/**
* A StoreScanner for our CELL_GRID above. Fakes the block transitions. Does counts of
* calls to optimize and counts of when optimize actually did an optimize.
*/
private static class CellGridStoreScanner extends StoreScanner {
// Count of how often optimize is called and of how often it does an optimize.
final AtomicInteger count = new AtomicInteger(0);
AtomicInteger count;
final AtomicInteger optimization = new AtomicInteger(0);
CellGridStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType)
@ -143,16 +160,33 @@ public class TestStoreScanner {
new KeyValueScanner[] {new KeyValueScanFixture(CellComparator.COMPARATOR, CELL_GRID)}));
}
protected ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
count.incrementAndGet();
ScanQueryMatcher.MatchCode after = super.optimize(qcode, cell);
LOG.info("Cell=" + cell + ", nextIndex=" + CellUtil.toString(getNextIndexedKey(), false) +
", before=" + qcode + ", after=" + after);
if (qcode != after) {
protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
CellComparator comparator) throws IOException {
if (count == null) {
count = new AtomicInteger(0);
}
heap = new KeyValueHeapWithCount(scanners, comparator, count);
}
protected boolean trySkipToNextRow(Cell cell) throws IOException {
boolean optimized = super.trySkipToNextRow(cell);
LOG.info("Cell=" + cell + ", nextIndex=" + CellUtil.toString(getNextIndexedKey(), false)
+ ", optimized=" + optimized);
if (optimized) {
optimization.incrementAndGet();
}
return after;
};
return optimized;
}
protected boolean trySkipToNextColumn(Cell cell) throws IOException {
boolean optimized = super.trySkipToNextColumn(cell);
LOG.info("Cell=" + cell + ", nextIndex=" + CellUtil.toString(getNextIndexedKey(), false)
+ ", optimized=" + optimized);
if (optimized) {
optimization.incrementAndGet();
}
return optimized;
}
@Override
public Cell getNextIndexedKey() {
@ -167,6 +201,113 @@ public class TestStoreScanner {
}
};
private static final int CELL_WITH_VERSIONS_BLOCK2_BOUNDARY = 4;
private static final Cell [] CELL_WITH_VERSIONS = new Cell [] {
CellUtil.createCell(ONE, CF, ONE, 2L, KeyValue.Type.Put.getCode(), VALUE),
CellUtil.createCell(ONE, CF, ONE, 1L, KeyValue.Type.Put.getCode(), VALUE),
CellUtil.createCell(ONE, CF, TWO, 2L, KeyValue.Type.Put.getCode(), VALUE),
CellUtil.createCell(ONE, CF, TWO, 1L, KeyValue.Type.Put.getCode(), VALUE),
// Offset 4 CELL_WITH_VERSIONS_BLOCK2_BOUNDARY
CellUtil.createCell(TWO, CF, ONE, 1L, KeyValue.Type.Put.getCode(), VALUE),
CellUtil.createCell(TWO, CF, TWO, 1L, KeyValue.Type.Put.getCode(), VALUE),
};
private static class CellWithVersionsStoreScanner extends StoreScanner {
// Count of how often optimize is called and of how often it does an optimize.
final AtomicInteger optimization = new AtomicInteger(0);
CellWithVersionsStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType)
throws IOException {
super(scan, scanInfo, scanType, scan.getFamilyMap().get(CF), Arrays
.<KeyValueScanner> asList(new KeyValueScanner[] { new KeyValueScanFixture(
CellComparator.COMPARATOR, CELL_WITH_VERSIONS) }));
}
protected boolean trySkipToNextColumn(Cell cell) throws IOException {
boolean optimized = super.trySkipToNextColumn(cell);
LOG.info("Cell=" + cell + ", nextIndex=" + CellUtil.toString(getNextIndexedKey(), false)
+ ", optimized=" + optimized);
if (optimized) {
optimization.incrementAndGet();
}
return optimized;
}
@Override
public Cell getNextIndexedKey() {
// Fake block boundaries by having index of next block change as we go through scan.
return CellUtil.createFirstOnRow(CELL_WITH_VERSIONS[CELL_WITH_VERSIONS_BLOCK2_BOUNDARY]);
}
};
private static class CellWithVersionsNoOptimizeStoreScanner extends StoreScanner {
// Count of how often optimize is called and of how often it does an optimize.
final AtomicInteger optimization = new AtomicInteger(0);
CellWithVersionsNoOptimizeStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType)
throws IOException {
super(scan, scanInfo, scanType, scan.getFamilyMap().get(CF), Arrays
.<KeyValueScanner> asList(new KeyValueScanner[] { new KeyValueScanFixture(
CellComparator.COMPARATOR, CELL_WITH_VERSIONS) }));
}
protected boolean trySkipToNextColumn(Cell cell) throws IOException {
boolean optimized = super.trySkipToNextColumn(cell);
LOG.info("Cell=" + cell + ", nextIndex=" + CellUtil.toString(getNextIndexedKey(), false)
+ ", optimized=" + optimized);
if (optimized) {
optimization.incrementAndGet();
}
return optimized;
}
@Override
public Cell getNextIndexedKey() {
return null;
}
};
@Test
public void testWithColumnCountGetFilter() throws Exception {
Get get = new Get(ONE);
get.setMaxVersions();
get.addFamily(CF);
get.setFilter(new ColumnCountGetFilter(2));
CellWithVersionsNoOptimizeStoreScanner scannerNoOptimize = new CellWithVersionsNoOptimizeStoreScanner(
new Scan(get), this.scanInfo, this.scanType);
try {
List<Cell> results = new ArrayList<>();
while (scannerNoOptimize.next(results)) {
continue;
}
Assert.assertEquals(2, results.size());
Assert.assertTrue(CellUtil.matchingColumn(results.get(0), CELL_WITH_VERSIONS[0]));
Assert.assertTrue(CellUtil.matchingColumn(results.get(1), CELL_WITH_VERSIONS[2]));
Assert.assertTrue("Optimize should do some optimizations",
scannerNoOptimize.optimization.get() == 0);
} finally {
scannerNoOptimize.close();
}
get.setFilter(new ColumnCountGetFilter(2));
CellWithVersionsStoreScanner scanner = new CellWithVersionsStoreScanner(new Scan(get),
this.scanInfo, this.scanType);
try {
List<Cell> results = new ArrayList<>();
while (scanner.next(results)) {
continue;
}
Assert.assertEquals(2, results.size());
Assert.assertTrue(CellUtil.matchingColumn(results.get(0), CELL_WITH_VERSIONS[0]));
Assert.assertTrue(CellUtil.matchingColumn(results.get(1), CELL_WITH_VERSIONS[2]));
Assert.assertTrue("Optimize should do some optimizations", scanner.optimization.get() > 0);
} finally {
scanner.close();
}
}
/*
* Test utility for building a NavigableSet for scanners.
* @param strCols