HBASE-9778 Add hint to ExplicitColumnTracker to avoid seeking

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1576457 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2014-03-11 18:28:42 +00:00
parent fdd4e8a01b
commit d2eac30d68
6 changed files with 147 additions and 26 deletions

View File

@ -89,6 +89,23 @@ public class Scan extends Query {
private static final String RAW_ATTR = "_raw_";
private static final String ISOLATION_LEVEL = "_isolationlevel_";
/**
* EXPERT ONLY.
* An integer (not long) indicating to the scanner logic how many times we attempt to retrieve the
* next KV before we schedule a reseek.
* The right value depends on the size of the average KV. A reseek is more efficient when
* it can skip 5-10 KVs or 512B-1KB, or when the next KV is likely found in another HFile block.
* Setting this only has any effect when columns were added with
* {@link #addColumn(byte[], byte[])}
* <pre>{@code
* Scan s = new Scan(...);
* s.addColumn(...);
* s.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2));
* }</pre>
* Default is 0 (always reseek).
*/
public static final String HINT_LOOKAHEAD = "_look_ahead_";
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
private int maxVersions = 1;

View File

@ -56,6 +56,10 @@ public class ExplicitColumnTracker implements ColumnTracker {
private final int maxVersions;
private final int minVersions;
// hint for the tracker about how many KVs we will attempt to search via next()
// before we schedule a (re)seek operation
private final int lookAhead;
/**
* Contains the list of columns that the ExplicitColumnTracker is tracking.
* Each ColumnCount instance also tracks how many versions of the requested
@ -68,6 +72,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
* Used to eliminate duplicates. */
private long latestTSOfCurrentColumn;
private long oldestStamp;
private int skipCount;
/**
* Default constructor.
@ -76,11 +81,14 @@ public class ExplicitColumnTracker implements ColumnTracker {
* @param maxVersions maximum versions to return per column
* @param oldestUnexpiredTS the oldest timestamp we are interested in,
* based on TTL
* @param lookAhead number of KeyValues to look ahead via next before
* (re)seeking
*/
public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions,
int maxVersions, long oldestUnexpiredTS) {
int maxVersions, long oldestUnexpiredTS, int lookAhead) {
this.maxVersions = maxVersions;
this.minVersions = minVersions;
this.lookAhead = lookAhead;
this.oldestStamp = oldestUnexpiredTS;
this.columns = new ColumnCount[columns.size()];
int i=0;
@ -136,7 +144,8 @@ public class ExplicitColumnTracker implements ColumnTracker {
if (ret > 0) {
// The current KV is smaller than the column the ExplicitColumnTracker
// is interested in, so seek to that column of interest.
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
return this.skipCount++ < this.lookAhead ? ScanQueryMatcher.MatchCode.SKIP
: ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
}
// The current KV is bigger than the column the ExplicitColumnTracker
@ -145,6 +154,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
// column of interest, and check again.
if (ret <= -1) {
++this.index;
this.skipCount = 0;
if (done()) {
// No more to match, do not include, done with this row.
return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
@ -169,6 +179,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
// Done with versions for this column
++this.index;
this.skipCount = 0;
resetTS();
if (done()) {
// We have served all the requested columns.
@ -187,6 +198,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
// Called between every row.
public void reset() {
this.index = 0;
this.skipCount = 0;
this.column = this.columns[this.index];
for(ColumnCount col : this.columns) {
col.setCount(0);
@ -226,6 +238,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
resetTS();
if (compare <= 0) {
++this.index;
this.skipCount = 0;
if (done()) {
// Will not hit any more columns in this storefile
this.column = null;

View File

@ -185,8 +185,9 @@ public class ScanQueryMatcher {
// We can share the ExplicitColumnTracker, diff is we reset
// between rows, not between storefiles.
this.columns = new ExplicitColumnTracker(columns,
scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
byte[] attr = scan.getAttribute(Scan.HINT_LOOKAHEAD);
this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
oldestUnexpiredTS, attr == null ? 0 : Bytes.toInt(attr));
}
this.isReversed = scan.isReversed();
}

View File

@ -44,9 +44,9 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
private void runTest(int maxVersions,
TreeSet<byte[]> trackColumns,
List<byte[]> scannerColumns,
List<MatchCode> expected) throws IOException {
List<MatchCode> expected, int lookAhead) throws IOException {
ColumnTracker exp = new ExplicitColumnTracker(
trackColumns, 0, maxVersions, Long.MIN_VALUE);
trackColumns, 0, maxVersions, Long.MIN_VALUE, lookAhead);
//Initialize result
@ -95,7 +95,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
scanner.add(col4);
scanner.add(col5);
runTest(maxVersions, columns, scanner, expected);
runTest(maxVersions, columns, scanner, expected, 0);
}
public void testGet_MultiVersion() throws IOException{
@ -150,9 +150,63 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
scanner.add(col5);
//Initialize result
runTest(maxVersions, columns, scanner, expected);
runTest(maxVersions, columns, scanner, expected, 0);
}
public void testGet_MultiVersionWithLookAhead() throws IOException{
if(PRINT){
System.out.println("\nMultiVersion");
}
//Create tracker
TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
//Looking for every other
columns.add(col2);
columns.add(col4);
List<ScanQueryMatcher.MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE); // col2; 1st version
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); // col2; 2nd version
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE); // col4; 1st version
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); // col4; 2nd version
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
int maxVersions = 2;
//Create "Scanner"
List<byte[]> scanner = new ArrayList<byte[]>();
scanner.add(col1);
scanner.add(col1);
scanner.add(col1);
scanner.add(col2);
scanner.add(col2);
scanner.add(col2);
scanner.add(col3);
scanner.add(col3);
scanner.add(col3);
scanner.add(col4);
scanner.add(col4);
scanner.add(col4);
scanner.add(col5);
scanner.add(col5);
scanner.add(col5);
//Initialize result
runTest(maxVersions, columns, scanner, expected, 2);
}
/**
* hbase-2259
@ -165,7 +219,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
}
ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions,
Long.MIN_VALUE);
Long.MIN_VALUE, 0);
for (int i = 0; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(),
@ -193,7 +247,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
new ScanQueryMatcher.MatchCode[] {
ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
ScanQueryMatcher.MatchCode.SEEK_NEXT_COL });
runTest(1, columns, scanner, expected);
runTest(1, columns, scanner, expected, 0);
}
}

View File

@ -89,22 +89,8 @@ public class TestQueryMatcher extends HBaseTestCase {
}
public void testMatch_ExplicitColumns()
throws IOException {
//Moving up from the Tracker by using Gets and List<KeyValue> instead
//of just byte []
//Expected result
List<MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.DONE);
// 2,4,5
private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
// 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2),
EnvironmentEdgeManager.currentTimeMillis() - ttl);
@ -136,6 +122,42 @@ public class TestQueryMatcher extends HBaseTestCase {
}
}
public void testMatch_ExplicitColumns()
throws IOException {
//Moving up from the Tracker by using Gets and List<KeyValue> instead
//of just byte []
//Expected result
List<MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.DONE);
_testMatch_ExplicitColumns(scan, expected);
}
public void testMatch_ExplicitColumnsWithLookAhead()
throws IOException {
//Moving up from the Tracker by using Gets and List<KeyValue> instead
//of just byte []
//Expected result
List<MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.DONE);
Scan s = new Scan(scan);
s.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2));
_testMatch_ExplicitColumns(s, expected);
}
public void testMatch_Wildcard()
throws IOException {

View File

@ -472,6 +472,20 @@ Deferred log flush can be configured on tables via <link
in the input scan because attribute over-selection is a non-trivial performance penalty over large datasets.
</para>
</section>
<section xml:id="perf.hbase.client.seek">
<title>Avoid scan seeks</title>
<para>When columns are selected explicitly with <code>scan.addColumn</code>, HBase will schedule seek operations to seek between the
selected columns. When rows have few columns and each column has only a few versions this can be inefficient. A seek operation is generally
slower if does not seek at least past 5-10 columns/versions or 512-1024 bytes.</para>
<para>In order to opportunistically look ahead a few columns/versions to see if the next column/version can be found that
way before a seek operation is scheduled, a new attribute <code>Scan.HINT_LOOKAHEAD</code> can be set the on Scan object. The following code instructs the
RegionServer to attempt two iterations of next before a seek is scheduled:<programlisting>
Scan scan = new Scan();
scan.addColumn(...);
scan.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2));
table.getScanner(scan);
</programlisting></para>
</section>
<section xml:id="perf.hbase.mr.input">
<title>MapReduce - Input Splits</title>
<para>For MapReduce jobs that use HBase tables as a source, if there a pattern where the "slow" map tasks seem to