HBASE-1647 Filter#filterRow is called too often, filters rows it shouldn't have -- reversed it for a moment; it may have broken things -- not sure yet
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@798510 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
69dc5b571e
commit
3086b191dd
|
@ -295,8 +295,6 @@ Release 0.20.0 - Unreleased
|
|||
(Tim Sell and Ryan Rawson via Stack)
|
||||
HBASE-1703 ICVs across /during a flush can cause multiple keys with the
|
||||
same TS (bad)
|
||||
HBASE-1647 Filter#filterRow is called too often, filters rows it
|
||||
shouldn't have (Doğacan Güney via Ryan Rawson and Stack)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -53,8 +53,6 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.Reference.Range;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
|
@ -1683,13 +1681,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
class RegionScanner implements InternalScanner {
|
||||
private final KeyValueHeap storeHeap;
|
||||
private final byte [] stopRow;
|
||||
private Filter filter;
|
||||
private RowFilterInterface oldFilter;
|
||||
private List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
|
||||
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
|
||||
this.filter = scan.getFilter();
|
||||
this.oldFilter = scan.getOldFilter();
|
||||
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
|
||||
this.stopRow = null;
|
||||
} else {
|
||||
|
@ -1713,74 +1706,46 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
this(scan, null);
|
||||
}
|
||||
|
||||
private void resetFilters() {
|
||||
if (filter != null) {
|
||||
filter.reset();
|
||||
}
|
||||
if (oldFilter != null) {
|
||||
oldFilter.reset();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next row of results from this region.
|
||||
* @param results list to append results to
|
||||
* @return true if there are more rows, false if scanner is done
|
||||
*/
|
||||
@Override
|
||||
public boolean next(List<KeyValue> outResults) throws IOException {
|
||||
results.clear();
|
||||
boolean returnResult = nextInternal();
|
||||
if (!returnResult && filter != null && filter.filterRow()) {
|
||||
results.clear();
|
||||
}
|
||||
outResults.addAll(results);
|
||||
resetFilters();
|
||||
return returnResult;
|
||||
}
|
||||
|
||||
private boolean nextInternal() throws IOException {
|
||||
public boolean next(List<KeyValue> results)
|
||||
throws IOException {
|
||||
// This method should probably be reorganized a bit... has gotten messy
|
||||
KeyValue kv;
|
||||
byte[] currentRow = null;
|
||||
boolean filterCurrentRow = false;
|
||||
KeyValue kv = this.storeHeap.peek();
|
||||
if (kv == null) {
|
||||
return false;
|
||||
}
|
||||
byte [] currentRow = kv.getRow();
|
||||
// See if we passed stopRow
|
||||
if (stopRow != null &&
|
||||
comparator.compareRows(stopRow, 0, stopRow.length,
|
||||
currentRow, 0, currentRow.length) <= 0) {
|
||||
return false;
|
||||
}
|
||||
this.storeHeap.next(results);
|
||||
while(true) {
|
||||
kv = this.storeHeap.peek();
|
||||
if (kv == null) {
|
||||
return false;
|
||||
}
|
||||
byte [] row = kv.getRow();
|
||||
if (filterCurrentRow && Bytes.equals(currentRow, row)) {
|
||||
// filter all columns until row changes
|
||||
this.storeHeap.next(results);
|
||||
results.clear();
|
||||
continue;
|
||||
}
|
||||
// see if current row should be filtered based on row key
|
||||
if ((filter != null && filter.filterRowKey(row, 0, row.length)) ||
|
||||
(oldFilter != null && oldFilter.filterRowKey(row, 0, row.length))) {
|
||||
this.storeHeap.next(results);
|
||||
results.clear();
|
||||
resetFilters();
|
||||
filterCurrentRow = true;
|
||||
currentRow = row;
|
||||
continue;
|
||||
}
|
||||
if(!Bytes.equals(currentRow, row)) {
|
||||
// Continue on the next row:
|
||||
currentRow = row;
|
||||
filterCurrentRow = false;
|
||||
// See if we passed stopRow
|
||||
if(stopRow != null &&
|
||||
comparator.compareRows(stopRow, 0, stopRow.length,
|
||||
currentRow, 0, currentRow.length) <= 0) {
|
||||
return false;
|
||||
}
|
||||
// if there are _no_ results or current row should be filtered
|
||||
if (results.isEmpty() || filter != null && filter.filterRow()) {
|
||||
// make sure results is empty
|
||||
results.clear();
|
||||
resetFilters();
|
||||
// Next row:
|
||||
|
||||
// what happens if there are _no_ results:
|
||||
if (results.isEmpty()) {
|
||||
// Continue on the next row:
|
||||
currentRow = row;
|
||||
|
||||
// But did we pass the stop row?
|
||||
if (stopRow != null &&
|
||||
comparator.compareRows(stopRow, 0, stopRow.length,
|
||||
currentRow, 0, currentRow.length) <= 0) {
|
||||
return false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -325,6 +325,7 @@ public class QueryMatcher {
|
|||
public void reset() {
|
||||
this.deletes.reset();
|
||||
this.columns.reset();
|
||||
if (this.filter != null) this.filter.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -114,6 +114,16 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
if (this.stickyNextRow)
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
|
||||
// Give the row filter a chance to do it's job.
|
||||
if (filter != null && filter.filterRowKey(bytes, offset, rowLength)) {
|
||||
stickyNextRow = true; // optimize to keep from calling the filter too much.
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
} else if (oldFilter != null && oldFilter.filterRowKey(bytes, offset, rowLength)) {
|
||||
stickyNextRow = true;
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
}
|
||||
|
||||
|
||||
if (this.columns.done()) {
|
||||
stickyNextRow = true;
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
|
@ -189,6 +199,16 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
return MatchCode.SEEK_NEXT_ROW;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the row was otherwise going to be included, call this to last-minute
|
||||
* check.
|
||||
*
|
||||
* @return <code>true</code> if the row should be filtered.
|
||||
*/
|
||||
public boolean filterEntireRow() {
|
||||
return filter == null? false: filter.filterRow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set current row
|
||||
* @param row
|
||||
|
@ -203,5 +223,7 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
public void reset() {
|
||||
super.reset();
|
||||
stickyNextRow = false;
|
||||
if (filter != null)
|
||||
filter.reset();
|
||||
}
|
||||
}
|
|
@ -162,12 +162,20 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
continue;
|
||||
|
||||
case DONE:
|
||||
if (matcher.filterEntireRow()) {
|
||||
// nuke all results, and then return.
|
||||
results.clear();
|
||||
}
|
||||
|
||||
// copy jazz
|
||||
outResult.addAll(results);
|
||||
return true;
|
||||
|
||||
case DONE_SCAN:
|
||||
if (matcher.filterEntireRow()) {
|
||||
// nuke all results, and then return.
|
||||
results.clear();
|
||||
}
|
||||
close();
|
||||
|
||||
// copy jazz
|
||||
|
@ -194,6 +202,11 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
throw new RuntimeException("UNEXPECTED");
|
||||
}
|
||||
}
|
||||
|
||||
if (matcher.filterEntireRow()) {
|
||||
// nuke all results, and then return.
|
||||
results.clear();
|
||||
}
|
||||
|
||||
if (!results.isEmpty()) {
|
||||
// copy jazz
|
||||
|
|
|
@ -38,14 +38,6 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
|
||||
import org.apache.hadoop.hbase.filter.InclusiveStopRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.PrefixRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
@ -117,7 +109,7 @@ public class TestScanner extends HBaseTestCase {
|
|||
count++;
|
||||
}
|
||||
s.close();
|
||||
assertEquals(0, count);
|
||||
assertEquals(1, count);
|
||||
// Now do something a bit more imvolved.
|
||||
scan = new Scan(startrow, stoprow);
|
||||
scan.addFamily(HConstants.CATALOG_FAMILY);
|
||||
|
@ -144,69 +136,6 @@ public class TestScanner extends HBaseTestCase {
|
|||
shutdownDfs(this.cluster);
|
||||
}
|
||||
}
|
||||
|
||||
void rowPrefixFilter(Scan scan) throws IOException {
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
scan.addFamily(HConstants.CATALOG_FAMILY);
|
||||
InternalScanner s = r.getScanner(scan);
|
||||
boolean hasMore = true;
|
||||
while (hasMore) {
|
||||
hasMore = s.next(results);
|
||||
for (KeyValue kv : results) {
|
||||
assertEquals((byte)'a', kv.getRow()[0]);
|
||||
assertEquals((byte)'b', kv.getRow()[1]);
|
||||
}
|
||||
results.clear();
|
||||
}
|
||||
s.close();
|
||||
}
|
||||
|
||||
void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
scan.addFamily(HConstants.CATALOG_FAMILY);
|
||||
InternalScanner s = r.getScanner(scan);
|
||||
boolean hasMore = true;
|
||||
while (hasMore) {
|
||||
hasMore = s.next(results);
|
||||
for (KeyValue kv : results) {
|
||||
assertTrue(Bytes.compareTo(kv.getRow(), stopRow) <= 0);
|
||||
}
|
||||
results.clear();
|
||||
}
|
||||
s.close();
|
||||
}
|
||||
|
||||
public void testFilters() throws IOException {
|
||||
try {
|
||||
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
|
||||
addContent(this.r, HConstants.CATALOG_FAMILY);
|
||||
Filter newFilter = new PrefixFilter(Bytes.toBytes("ab"));
|
||||
Scan scan = new Scan();
|
||||
scan.setFilter(newFilter);
|
||||
rowPrefixFilter(scan);
|
||||
RowFilterInterface oldFilter = new PrefixRowFilter(Bytes.toBytes("ab"));
|
||||
scan = new Scan();
|
||||
scan.setOldFilter(oldFilter);
|
||||
rowPrefixFilter(scan);
|
||||
|
||||
byte[] stopRow = Bytes.toBytes("bbc");
|
||||
newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
|
||||
scan = new Scan();
|
||||
scan.setFilter(newFilter);
|
||||
rowInclusiveStopFilter(scan, stopRow);
|
||||
|
||||
oldFilter = new WhileMatchRowFilter(
|
||||
new InclusiveStopRowFilter(stopRow));
|
||||
scan = new Scan();
|
||||
scan.setOldFilter(oldFilter);
|
||||
rowInclusiveStopFilter(scan, stopRow);
|
||||
|
||||
} finally {
|
||||
this.r.close();
|
||||
this.r.getLog().closeAndDelete();
|
||||
shutdownDfs(this.cluster);
|
||||
}
|
||||
}
|
||||
|
||||
/** The test!
|
||||
* @throws IOException
|
||||
|
@ -387,6 +316,7 @@ public class TestScanner extends HBaseTestCase {
|
|||
String server = Bytes.toString(val);
|
||||
assertEquals(0, server.compareTo(serverName));
|
||||
}
|
||||
results.clear();
|
||||
}
|
||||
} finally {
|
||||
InternalScanner s = scanner;
|
||||
|
|
|
@ -20,23 +20,25 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueTestUtil;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||
import org.apache.hadoop.hbase.filter.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueTestUtil;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
public class TestStoreScanner extends TestCase {
|
||||
|
||||
final byte [] CF = Bytes.toBytes("cf");
|
||||
|
||||
|
||||
/**
|
||||
* Test utility for building a NavigableSet for scanners.
|
||||
* @param strCols
|
||||
|
@ -65,9 +67,9 @@ public class TestStoreScanner extends TestCase {
|
|||
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
|
||||
// this only uses maxVersions (default=1) and TimeRange (default=all)
|
||||
StoreScanner scan =
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
|
||||
KeyValue.COMPARATOR, getCols("a"),
|
||||
scanners);
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
|
||||
KeyValue.COMPARATOR, getCols("a"),
|
||||
scanners);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(true, scan.next(results));
|
||||
|
@ -96,9 +98,9 @@ public class TestStoreScanner extends TestCase {
|
|||
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
|
||||
// this only uses maxVersions (default=1) and TimeRange (default=all)
|
||||
StoreScanner scan =
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
|
||||
KeyValue.COMPARATOR, getCols("a"),
|
||||
scanners);
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
|
||||
KeyValue.COMPARATOR, getCols("a"),
|
||||
scanners);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
scan.next(results);
|
||||
|
@ -128,8 +130,8 @@ public class TestStoreScanner extends TestCase {
|
|||
};
|
||||
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
|
||||
StoreScanner scan =
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a"), scanners);
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a"), scanners);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertFalse(scan.next(results));
|
||||
|
@ -151,9 +153,9 @@ public class TestStoreScanner extends TestCase {
|
|||
};
|
||||
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
|
||||
StoreScanner scan =
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a"), scanners);
|
||||
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a"), scanners);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(true, scan.next(results));
|
||||
assertEquals(0, results.size());
|
||||
|
@ -181,8 +183,8 @@ public class TestStoreScanner extends TestCase {
|
|||
new KeyValueScanFixture(KeyValue.COMPARATOR, kvs2)
|
||||
};
|
||||
StoreScanner scan =
|
||||
new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a"), scanners);
|
||||
new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a"), scanners);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
// the two put at ts=now will be masked by the 1 delete, and
|
||||
// since the scan default returns 1 version we'll return the newest
|
||||
|
@ -209,8 +211,8 @@ public class TestStoreScanner extends TestCase {
|
|||
};
|
||||
Scan scanSpec = new Scan(Bytes.toBytes("R1")).setMaxVersions(2);
|
||||
StoreScanner scan =
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a"), scanners);
|
||||
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a"), scanners);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(true, scan.next(results));
|
||||
assertEquals(2, results.size());
|
||||
|
@ -219,17 +221,17 @@ public class TestStoreScanner extends TestCase {
|
|||
}
|
||||
|
||||
public void testWildCardOneVersionScan() throws IOException {
|
||||
KeyValue [] kvs = new KeyValue [] {
|
||||
KeyValueTestUtil.create("R1", "cf", "a", 2, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "b", 1, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.DeleteColumn, "dont-care"),
|
||||
};
|
||||
KeyValue [] kvs = new KeyValue [] {
|
||||
KeyValueTestUtil.create("R1", "cf", "a", 2, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "b", 1, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.DeleteColumn, "dont-care"),
|
||||
};
|
||||
KeyValueScanner [] scanners = new KeyValueScanner[] {
|
||||
new KeyValueScanFixture(KeyValue.COMPARATOR, kvs)
|
||||
};
|
||||
StoreScanner scan =
|
||||
new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
null, scanners);
|
||||
new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
null, scanners);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(true, scan.next(results));
|
||||
assertEquals(2, results.size());
|
||||
|
@ -259,8 +261,8 @@ public class TestStoreScanner extends TestCase {
|
|||
new KeyValueScanFixture(KeyValue.COMPARATOR, kvs)
|
||||
};
|
||||
StoreScanner scan =
|
||||
new StoreScanner(new Scan().setMaxVersions(2), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
null, scanners);
|
||||
new StoreScanner(new Scan().setMaxVersions(2), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
null, scanners);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(true, scan.next(results));
|
||||
assertEquals(5, results.size());
|
||||
|
@ -289,8 +291,8 @@ public class TestStoreScanner extends TestCase {
|
|||
new KeyValueScanFixture(KeyValue.COMPARATOR, kvs)
|
||||
};
|
||||
StoreScanner scan =
|
||||
new StoreScanner(new Scan().setMaxVersions(Integer.MAX_VALUE), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
null, scanners);
|
||||
new StoreScanner(new Scan().setMaxVersions(Integer.MAX_VALUE), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
null, scanners);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(true, scan.next(results));
|
||||
assertEquals(0, results.size());
|
||||
|
@ -312,8 +314,8 @@ public class TestStoreScanner extends TestCase {
|
|||
new KeyValueScanFixture(KeyValue.COMPARATOR, kvs),
|
||||
};
|
||||
StoreScanner scan =
|
||||
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
null, scanners);
|
||||
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
null, scanners);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(true, scan.next(results));
|
||||
assertEquals(1, results.size());
|
||||
|
@ -337,9 +339,9 @@ public class TestStoreScanner extends TestCase {
|
|||
new KeyValueScanFixture(KeyValue.COMPARATOR, kvs)
|
||||
};
|
||||
StoreScanner scan =
|
||||
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a", "d"), scanners);
|
||||
|
||||
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
|
||||
getCols("a", "d"), scanners);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(true, scan.next(results));
|
||||
assertEquals(2, results.size());
|
||||
|
@ -350,8 +352,156 @@ public class TestStoreScanner extends TestCase {
|
|||
assertEquals(true, scan.next(results));
|
||||
assertEquals(1, results.size());
|
||||
assertEquals(kvs[kvs.length-1], results.get(0));
|
||||
|
||||
|
||||
results.clear();
|
||||
assertEquals(false, scan.next(results));
|
||||
}
|
||||
|
||||
KeyValue [] stdKvs = new KeyValue[] {
|
||||
KeyValueTestUtil.create("R:1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 9...
|
||||
KeyValueTestUtil.create("R:2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:2", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:2", "cf", "c", 10, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 12...
|
||||
KeyValueTestUtil.create("R:3", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:3", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:3", "cf", "c", 10, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 15 ...
|
||||
KeyValueTestUtil.create("R:4", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:4", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:4", "cf", "c", 10, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 18 ..
|
||||
KeyValueTestUtil.create("R:5", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:5", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 20...
|
||||
KeyValueTestUtil.create("R:6", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:6", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 22...
|
||||
KeyValueTestUtil.create("R:7", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:7", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 24...
|
||||
KeyValueTestUtil.create("R:8", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:8", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 26 ..
|
||||
KeyValueTestUtil.create("RA:1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 27...
|
||||
KeyValueTestUtil.create("RA:2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 28..
|
||||
KeyValueTestUtil.create("RA:3", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
};
|
||||
private StoreScanner getTestScanner(Scan s, NavigableSet<byte[]> cols) {
|
||||
KeyValueScanner [] scanners = new KeyValueScanner[] {
|
||||
new KeyValueScanFixture(KeyValue.COMPARATOR, stdKvs)
|
||||
};
|
||||
|
||||
return new StoreScanner(s, CF, Long.MAX_VALUE, KeyValue.COMPARATOR, cols,
|
||||
scanners);
|
||||
}
|
||||
|
||||
|
||||
// Test new and old row prefix filters.
|
||||
public void testNewRowPrefixFilter() throws IOException {
|
||||
Filter f = new WhileMatchFilter(
|
||||
new PrefixFilter(Bytes.toBytes("R:")));
|
||||
Scan s = new Scan(Bytes.toBytes("R:7"));
|
||||
s.setFilter(f);
|
||||
|
||||
rowPrefixFilter(s);
|
||||
}
|
||||
|
||||
public void testOldRowPrefixFilter() throws IOException {
|
||||
RowFilterInterface f = new WhileMatchRowFilter(
|
||||
new PrefixRowFilter(Bytes.toBytes("R:")));
|
||||
Scan s = new Scan(Bytes.toBytes("R:7"));
|
||||
s.setOldFilter(f);
|
||||
|
||||
rowPrefixFilter(s);
|
||||
|
||||
}
|
||||
public void rowPrefixFilter(Scan s) throws IOException {
|
||||
|
||||
StoreScanner scan = getTestScanner(s, null);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertTrue(scan.next(results));
|
||||
assertEquals(2, results.size());
|
||||
assertEquals(stdKvs[22], results.get(0));
|
||||
assertEquals(stdKvs[23], results.get(1));
|
||||
results.clear();
|
||||
|
||||
assertTrue(scan.next(results));
|
||||
assertEquals(2, results.size());
|
||||
assertEquals(stdKvs[24], results.get(0));
|
||||
assertEquals(stdKvs[25], results.get(1));
|
||||
results.clear();
|
||||
|
||||
assertFalse(scan.next(results));
|
||||
assertEquals(0, results.size());
|
||||
}
|
||||
|
||||
// Test new and old row-inclusive stop filter.
|
||||
public void testNewRowInclusiveStopFilter() throws IOException {
|
||||
Filter f = new WhileMatchFilter(new InclusiveStopFilter(Bytes.toBytes("R:3")));
|
||||
Scan scan = new Scan();
|
||||
scan.setFilter(f);
|
||||
|
||||
rowInclusiveStopFilter(scan);
|
||||
}
|
||||
|
||||
public void testOldRowInclusiveTopFilter() throws IOException {
|
||||
RowFilterInterface f = new WhileMatchRowFilter(
|
||||
new InclusiveStopRowFilter(Bytes.toBytes("R:3")));
|
||||
Scan scan = new Scan();
|
||||
scan.setOldFilter(f);
|
||||
|
||||
rowInclusiveStopFilter(scan);
|
||||
}
|
||||
|
||||
public void rowInclusiveStopFilter(Scan scan) throws IOException {
|
||||
StoreScanner s = getTestScanner(scan, getCols("a"));
|
||||
|
||||
// read crap.
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertTrue(s.next(results));
|
||||
assertEquals(1, results.size());
|
||||
assertEquals(stdKvs[0], results.get(0));
|
||||
results.clear();
|
||||
|
||||
assertTrue(s.next(results));
|
||||
assertEquals(1, results.size());
|
||||
assertEquals(stdKvs[9], results.get(0));
|
||||
results.clear();
|
||||
|
||||
assertTrue(s.next(results));
|
||||
assertEquals(1, results.size());
|
||||
assertEquals(stdKvs[12], results.get(0));
|
||||
results.clear();
|
||||
|
||||
// without aggressive peeking, the scanner doesnt know if the next row is good or not
|
||||
// under the affects of a filter.
|
||||
assertFalse(s.next(results));
|
||||
assertEquals(0, results.size());
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue