diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 80430e6a04e..bf968df7ff4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -89,6 +89,23 @@ public class Scan extends Query { private static final String RAW_ATTR = "_raw_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; + /** + * EXPERT ONLY. + * An integer (not long) indicating to the scanner logic how many times we attempt to retrieve the + * next KV before we schedule a reseek. + * The right value depends on the size of the average KV. A reseek is more efficient when + * it can skip 5-10 KVs or 512B-1KB, or when the next KV is likely found in another HFile block. + * Setting this only has any effect when columns were added with + * {@link #addColumn(byte[], byte[])} + *
{@code
+   * Scan s = new Scan(...);
+   * s.addColumn(...);
+   * s.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2));
+   * }
+ * Default is 0 (always reseek). + */ + public static final String HINT_LOOKAHEAD = "_look_ahead_"; + private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; private int maxVersions = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java index 13594091ba3..eceade5c79f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java @@ -56,6 +56,10 @@ public class ExplicitColumnTracker implements ColumnTracker { private final int maxVersions; private final int minVersions; + // hint for the tracker about how many KVs we will attempt to search via next() + // before we schedule a (re)seek operation + private final int lookAhead; + /** * Contains the list of columns that the ExplicitColumnTracker is tracking. * Each ColumnCount instance also tracks how many versions of the requested @@ -68,6 +72,7 @@ public class ExplicitColumnTracker implements ColumnTracker { * Used to eliminate duplicates. */ private long latestTSOfCurrentColumn; private long oldestStamp; + private int skipCount; /** * Default constructor. @@ -76,11 +81,14 @@ public class ExplicitColumnTracker implements ColumnTracker { * @param maxVersions maximum versions to return per column * @param oldestUnexpiredTS the oldest timestamp we are interested in, * based on TTL + * @param lookAhead number of KeyValues to look ahead via next before + * (re)seeking */ public ExplicitColumnTracker(NavigableSet columns, int minVersions, - int maxVersions, long oldestUnexpiredTS) { + int maxVersions, long oldestUnexpiredTS, int lookAhead) { this.maxVersions = maxVersions; this.minVersions = minVersions; + this.lookAhead = lookAhead; this.oldestStamp = oldestUnexpiredTS; this.columns = new ColumnCount[columns.size()]; int i=0; @@ -136,7 +144,8 @@ public class ExplicitColumnTracker implements ColumnTracker { if (ret > 0) { // The current KV is smaller than the column the ExplicitColumnTracker // is interested in, so seek to that column of interest. - return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; + return this.skipCount++ < this.lookAhead ? ScanQueryMatcher.MatchCode.SKIP + : ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; } // The current KV is bigger than the column the ExplicitColumnTracker @@ -145,6 +154,7 @@ public class ExplicitColumnTracker implements ColumnTracker { // column of interest, and check again. if (ret <= -1) { ++this.index; + this.skipCount = 0; if (done()) { // No more to match, do not include, done with this row. return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row @@ -169,6 +179,7 @@ public class ExplicitColumnTracker implements ColumnTracker { if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { // Done with versions for this column ++this.index; + this.skipCount = 0; resetTS(); if (done()) { // We have served all the requested columns. @@ -187,6 +198,7 @@ public class ExplicitColumnTracker implements ColumnTracker { // Called between every row. public void reset() { this.index = 0; + this.skipCount = 0; this.column = this.columns[this.index]; for(ColumnCount col : this.columns) { col.setCount(0); @@ -226,6 +238,7 @@ public class ExplicitColumnTracker implements ColumnTracker { resetTS(); if (compare <= 0) { ++this.index; + this.skipCount = 0; if (done()) { // Will not hit any more columns in this storefile this.column = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index 07d675f20dd..92eefd8e233 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -185,8 +185,9 @@ public class ScanQueryMatcher { // We can share the ExplicitColumnTracker, diff is we reset // between rows, not between storefiles. - this.columns = new ExplicitColumnTracker(columns, - scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); + byte[] attr = scan.getAttribute(Scan.HINT_LOOKAHEAD); + this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, + oldestUnexpiredTS, attr == null ? 0 : Bytes.toInt(attr)); } this.isReversed = scan.isReversed(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java index 21ecf8009e8..c259312b41d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java @@ -44,9 +44,9 @@ public class TestExplicitColumnTracker extends HBaseTestCase { private void runTest(int maxVersions, TreeSet trackColumns, List scannerColumns, - List expected) throws IOException { + List expected, int lookAhead) throws IOException { ColumnTracker exp = new ExplicitColumnTracker( - trackColumns, 0, maxVersions, Long.MIN_VALUE); + trackColumns, 0, maxVersions, Long.MIN_VALUE, lookAhead); //Initialize result @@ -95,7 +95,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase { scanner.add(col4); scanner.add(col5); - runTest(maxVersions, columns, scanner, expected); + runTest(maxVersions, columns, scanner, expected, 0); } public void testGet_MultiVersion() throws IOException{ @@ -150,9 +150,63 @@ public class TestExplicitColumnTracker extends HBaseTestCase { scanner.add(col5); //Initialize result - runTest(maxVersions, columns, scanner, expected); + runTest(maxVersions, columns, scanner, expected, 0); } + public void testGet_MultiVersionWithLookAhead() throws IOException{ + if(PRINT){ + System.out.println("\nMultiVersion"); + } + + //Create tracker + TreeSet columns = new TreeSet(Bytes.BYTES_COMPARATOR); + //Looking for every other + columns.add(col2); + columns.add(col4); + + List expected = new ArrayList(); + expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); + + expected.add(ScanQueryMatcher.MatchCode.INCLUDE); // col2; 1st version + expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); // col2; 2nd version + expected.add(ScanQueryMatcher.MatchCode.SKIP); + + expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); + + expected.add(ScanQueryMatcher.MatchCode.INCLUDE); // col4; 1st version + expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); // col4; 2nd version + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); + + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); + int maxVersions = 2; + + //Create "Scanner" + List scanner = new ArrayList(); + scanner.add(col1); + scanner.add(col1); + scanner.add(col1); + scanner.add(col2); + scanner.add(col2); + scanner.add(col2); + scanner.add(col3); + scanner.add(col3); + scanner.add(col3); + scanner.add(col4); + scanner.add(col4); + scanner.add(col4); + scanner.add(col5); + scanner.add(col5); + scanner.add(col5); + + //Initialize result + runTest(maxVersions, columns, scanner, expected, 2); + } /** * hbase-2259 @@ -165,7 +219,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase { } ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, - Long.MIN_VALUE); + Long.MIN_VALUE, 0); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(), @@ -193,7 +247,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase { new ScanQueryMatcher.MatchCode[] { ScanQueryMatcher.MatchCode.SEEK_NEXT_COL, ScanQueryMatcher.MatchCode.SEEK_NEXT_COL }); - runTest(1, columns, scanner, expected); + runTest(1, columns, scanner, expected, 0); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java index 70f62a389b1..37031a4d3f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java @@ -89,22 +89,8 @@ public class TestQueryMatcher extends HBaseTestCase { } - public void testMatch_ExplicitColumns() - throws IOException { - //Moving up from the Tracker by using Gets and List instead - //of just byte [] - - //Expected result - List expected = new ArrayList(); - expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); - expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); - expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); - expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); - expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); - expected.add(ScanQueryMatcher.MatchCode.DONE); - - // 2,4,5 - + private void _testMatch_ExplicitColumns(Scan scan, List expected) throws IOException { + // 2,4,5 ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2), EnvironmentEdgeManager.currentTimeMillis() - ttl); @@ -136,6 +122,42 @@ public class TestQueryMatcher extends HBaseTestCase { } } + public void testMatch_ExplicitColumns() + throws IOException { + //Moving up from the Tracker by using Gets and List instead + //of just byte [] + + //Expected result + List expected = new ArrayList(); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); + expected.add(ScanQueryMatcher.MatchCode.DONE); + + _testMatch_ExplicitColumns(scan, expected); + } + + public void testMatch_ExplicitColumnsWithLookAhead() + throws IOException { + //Moving up from the Tracker by using Gets and List instead + //of just byte [] + + //Expected result + List expected = new ArrayList(); + expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); + expected.add(ScanQueryMatcher.MatchCode.DONE); + + Scan s = new Scan(scan); + s.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2)); + _testMatch_ExplicitColumns(s, expected); + } + public void testMatch_Wildcard() throws IOException { diff --git a/src/main/docbkx/performance.xml b/src/main/docbkx/performance.xml index c7021c1d455..a282aca78af 100644 --- a/src/main/docbkx/performance.xml +++ b/src/main/docbkx/performance.xml @@ -472,6 +472,20 @@ Deferred log flush can be configured on tables via +
+ Avoid scan seeks + When columns are selected explicitly with scan.addColumn, HBase will schedule seek operations to seek between the + selected columns. When rows have few columns and each column has only a few versions this can be inefficient. A seek operation is generally + slower if does not seek at least past 5-10 columns/versions or 512-1024 bytes. + In order to opportunistically look ahead a few columns/versions to see if the next column/version can be found that + way before a seek operation is scheduled, a new attribute Scan.HINT_LOOKAHEAD can be set the on Scan object. The following code instructs the + RegionServer to attempt two iterations of next before a seek is scheduled: +Scan scan = new Scan(); +scan.addColumn(...); +scan.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2)); +table.getScanner(scan); + +
MapReduce - Input Splits For MapReduce jobs that use HBase tables as a source, if there a pattern where the "slow" map tasks seem to