HBASE-1647 Filter#filterRow is called too often, filters rows it shouldn't have
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@798714 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
18111cc899
commit
ba467d6712
|
@ -298,6 +298,8 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1671 HBASE-1609 broke scanners riding across splits
|
||||
HBASE-1717 Put on client-side uses passed-in byte[]s rather than always
|
||||
using copies
|
||||
HBASE-1647 Filter#filterRow is called too often, filters rows it shouldn't
|
||||
have (Doğacan Güney via Ryan Rawson and Stack)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.Reference.Range;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
|
@ -1226,7 +1228,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
public void put(Put put, Integer lockid, boolean writeToWAL)
|
||||
throws IOException {
|
||||
checkReadOnly();
|
||||
// validateValuesLength(put);
|
||||
|
||||
// Do a rough check that we have resources to accept a write. The check is
|
||||
// 'rough' in that between the resource check and the call to obtain a
|
||||
|
@ -1679,8 +1680,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
class RegionScanner implements InternalScanner {
|
||||
private final KeyValueHeap storeHeap;
|
||||
private final byte [] stopRow;
|
||||
private Filter filter;
|
||||
private RowFilterInterface oldFilter;
|
||||
private List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
|
||||
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
|
||||
this.filter = scan.getFilter();
|
||||
this.oldFilter = scan.getOldFilter();
|
||||
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
|
||||
this.stopRow = null;
|
||||
} else {
|
||||
|
@ -1704,52 +1710,80 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
this(scan, null);
|
||||
}
|
||||
|
||||
private void resetFilters() {
|
||||
if (filter != null) {
|
||||
filter.reset();
|
||||
}
|
||||
if (oldFilter != null) {
|
||||
oldFilter.reset();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next row of results from this region.
|
||||
* @param results list to append results to
|
||||
* @return true if there are more rows, false if scanner is done
|
||||
* @throws NotServerRegionException If this region is closing or closed
|
||||
*/
|
||||
public boolean next(List<KeyValue> results)
|
||||
throws IOException {
|
||||
@Override
|
||||
public boolean next(List<KeyValue> outResults) throws IOException {
|
||||
if (closing.get() || closed.get()) {
|
||||
close();
|
||||
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
|
||||
" is closing=" + closing.get() + " or closed=" + closed.get());
|
||||
}
|
||||
results.clear();
|
||||
boolean returnResult = nextInternal();
|
||||
if (!returnResult && filter != null && filter.filterRow()) {
|
||||
results.clear();
|
||||
}
|
||||
outResults.addAll(results);
|
||||
resetFilters();
|
||||
return returnResult;
|
||||
}
|
||||
|
||||
private boolean nextInternal() throws IOException {
|
||||
// This method should probably be reorganized a bit... has gotten messy
|
||||
KeyValue kv = this.storeHeap.peek();
|
||||
if (kv == null) {
|
||||
return false;
|
||||
}
|
||||
byte [] currentRow = kv.getRow();
|
||||
// See if we passed stopRow
|
||||
if (stopRow != null &&
|
||||
comparator.compareRows(stopRow, 0, stopRow.length,
|
||||
currentRow, 0, currentRow.length) <= 0) {
|
||||
return false;
|
||||
}
|
||||
this.storeHeap.next(results);
|
||||
KeyValue kv;
|
||||
byte[] currentRow = null;
|
||||
boolean filterCurrentRow = false;
|
||||
while (true) {
|
||||
kv = this.storeHeap.peek();
|
||||
if (kv == null) {
|
||||
return false;
|
||||
}
|
||||
byte [] row = kv.getRow();
|
||||
if (filterCurrentRow && Bytes.equals(currentRow, row)) {
|
||||
// filter all columns until row changes
|
||||
this.storeHeap.next(results);
|
||||
results.clear();
|
||||
continue;
|
||||
}
|
||||
// see if current row should be filtered based on row key
|
||||
if ((filter != null && filter.filterRowKey(row, 0, row.length)) ||
|
||||
(oldFilter != null && oldFilter.filterRowKey(row, 0, row.length))) {
|
||||
this.storeHeap.next(results);
|
||||
results.clear();
|
||||
resetFilters();
|
||||
filterCurrentRow = true;
|
||||
currentRow = row;
|
||||
continue;
|
||||
}
|
||||
if(!Bytes.equals(currentRow, row)) {
|
||||
// Next row:
|
||||
|
||||
// what happens if there are _no_ results:
|
||||
if (results.isEmpty()) {
|
||||
// Continue on the next row:
|
||||
currentRow = row;
|
||||
|
||||
// But did we pass the stop row?
|
||||
filterCurrentRow = false;
|
||||
// See if we passed stopRow
|
||||
if(stopRow != null &&
|
||||
comparator.compareRows(stopRow, 0, stopRow.length,
|
||||
currentRow, 0, currentRow.length) <= 0) {
|
||||
return false;
|
||||
}
|
||||
// if there are _no_ results or current row should be filtered
|
||||
if (results.isEmpty() || filter != null && filter.filterRow()) {
|
||||
// make sure results is empty
|
||||
results.clear();
|
||||
resetFilters();
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -325,7 +325,6 @@ public class QueryMatcher {
|
|||
public void reset() {
|
||||
this.deletes.reset();
|
||||
this.columns.reset();
|
||||
if (this.filter != null) this.filter.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -114,16 +114,6 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
if (this.stickyNextRow)
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
|
||||
// Give the row filter a chance to do it's job.
|
||||
if (filter != null && filter.filterRowKey(bytes, offset, rowLength)) {
|
||||
stickyNextRow = true; // optimize to keep from calling the filter too much.
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
} else if (oldFilter != null && oldFilter.filterRowKey(bytes, offset, rowLength)) {
|
||||
stickyNextRow = true;
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
}
|
||||
|
||||
|
||||
if (this.columns.done()) {
|
||||
stickyNextRow = true;
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
|
@ -199,16 +189,6 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
return MatchCode.SEEK_NEXT_ROW;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the row was otherwise going to be included, call this to last-minute
|
||||
* check.
|
||||
*
|
||||
* @return <code>true</code> if the row should be filtered.
|
||||
*/
|
||||
public boolean filterEntireRow() {
|
||||
return filter == null? false: filter.filterRow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set current row
|
||||
* @param row
|
||||
|
@ -223,7 +203,5 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
public void reset() {
|
||||
super.reset();
|
||||
stickyNextRow = false;
|
||||
if (filter != null)
|
||||
filter.reset();
|
||||
}
|
||||
}
|
|
@ -162,20 +162,12 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
continue;
|
||||
|
||||
case DONE:
|
||||
if (matcher.filterEntireRow()) {
|
||||
// nuke all results, and then return.
|
||||
results.clear();
|
||||
}
|
||||
|
||||
// copy jazz
|
||||
outResult.addAll(results);
|
||||
return true;
|
||||
|
||||
case DONE_SCAN:
|
||||
if (matcher.filterEntireRow()) {
|
||||
// nuke all results, and then return.
|
||||
results.clear();
|
||||
}
|
||||
close();
|
||||
|
||||
// copy jazz
|
||||
|
@ -203,11 +195,6 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
}
|
||||
}
|
||||
|
||||
if (matcher.filterEntireRow()) {
|
||||
// nuke all results, and then return.
|
||||
results.clear();
|
||||
}
|
||||
|
||||
if (!results.isEmpty()) {
|
||||
// copy jazz
|
||||
outResult.addAll(results);
|
||||
|
|
|
@ -38,6 +38,14 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
|
||||
import org.apache.hadoop.hbase.filter.InclusiveStopRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.PrefixRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
@ -109,7 +117,7 @@ public class TestScanner extends HBaseTestCase {
|
|||
count++;
|
||||
}
|
||||
s.close();
|
||||
assertEquals(1, count);
|
||||
assertEquals(0, count);
|
||||
// Now do something a bit more imvolved.
|
||||
scan = new Scan(startrow, stoprow);
|
||||
scan.addFamily(HConstants.CATALOG_FAMILY);
|
||||
|
@ -137,6 +145,69 @@ public class TestScanner extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
void rowPrefixFilter(Scan scan) throws IOException {
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
scan.addFamily(HConstants.CATALOG_FAMILY);
|
||||
InternalScanner s = r.getScanner(scan);
|
||||
boolean hasMore = true;
|
||||
while (hasMore) {
|
||||
hasMore = s.next(results);
|
||||
for (KeyValue kv : results) {
|
||||
assertEquals((byte)'a', kv.getRow()[0]);
|
||||
assertEquals((byte)'b', kv.getRow()[1]);
|
||||
}
|
||||
results.clear();
|
||||
}
|
||||
s.close();
|
||||
}
|
||||
|
||||
void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
scan.addFamily(HConstants.CATALOG_FAMILY);
|
||||
InternalScanner s = r.getScanner(scan);
|
||||
boolean hasMore = true;
|
||||
while (hasMore) {
|
||||
hasMore = s.next(results);
|
||||
for (KeyValue kv : results) {
|
||||
assertTrue(Bytes.compareTo(kv.getRow(), stopRow) <= 0);
|
||||
}
|
||||
results.clear();
|
||||
}
|
||||
s.close();
|
||||
}
|
||||
|
||||
public void testFilters() throws IOException {
|
||||
try {
|
||||
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
|
||||
addContent(this.r, HConstants.CATALOG_FAMILY);
|
||||
Filter newFilter = new PrefixFilter(Bytes.toBytes("ab"));
|
||||
Scan scan = new Scan();
|
||||
scan.setFilter(newFilter);
|
||||
rowPrefixFilter(scan);
|
||||
RowFilterInterface oldFilter = new PrefixRowFilter(Bytes.toBytes("ab"));
|
||||
scan = new Scan();
|
||||
scan.setOldFilter(oldFilter);
|
||||
rowPrefixFilter(scan);
|
||||
|
||||
byte[] stopRow = Bytes.toBytes("bbc");
|
||||
newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
|
||||
scan = new Scan();
|
||||
scan.setFilter(newFilter);
|
||||
rowInclusiveStopFilter(scan, stopRow);
|
||||
|
||||
oldFilter = new WhileMatchRowFilter(
|
||||
new InclusiveStopRowFilter(stopRow));
|
||||
scan = new Scan();
|
||||
scan.setOldFilter(oldFilter);
|
||||
rowInclusiveStopFilter(scan, stopRow);
|
||||
|
||||
} finally {
|
||||
this.r.close();
|
||||
this.r.getLog().closeAndDelete();
|
||||
shutdownDfs(this.cluster);
|
||||
}
|
||||
}
|
||||
|
||||
/** The test!
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -316,7 +387,6 @@ public class TestScanner extends HBaseTestCase {
|
|||
String server = Bytes.toString(val);
|
||||
assertEquals(0, server.compareTo(serverName));
|
||||
}
|
||||
results.clear();
|
||||
}
|
||||
} finally {
|
||||
InternalScanner s = scanner;
|
||||
|
|
|
@ -20,20 +20,18 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueTestUtil;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||
import org.apache.hadoop.hbase.filter.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueTestUtil;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
public class TestStoreScanner extends TestCase {
|
||||
|
||||
|
@ -356,152 +354,4 @@ public class TestStoreScanner extends TestCase {
|
|||
results.clear();
|
||||
assertEquals(false, scan.next(results));
|
||||
}
|
||||
|
||||
KeyValue [] stdKvs = new KeyValue[] {
|
||||
KeyValueTestUtil.create("R:1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 9...
|
||||
KeyValueTestUtil.create("R:2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:2", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:2", "cf", "c", 10, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 12...
|
||||
KeyValueTestUtil.create("R:3", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:3", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:3", "cf", "c", 10, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 15 ...
|
||||
KeyValueTestUtil.create("R:4", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:4", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:4", "cf", "c", 10, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 18 ..
|
||||
KeyValueTestUtil.create("R:5", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:5", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 20...
|
||||
KeyValueTestUtil.create("R:6", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:6", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 22...
|
||||
KeyValueTestUtil.create("R:7", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:7", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 24...
|
||||
KeyValueTestUtil.create("R:8", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
KeyValueTestUtil.create("R:8", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 26 ..
|
||||
KeyValueTestUtil.create("RA:1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 27...
|
||||
KeyValueTestUtil.create("RA:2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
|
||||
// 28..
|
||||
KeyValueTestUtil.create("RA:3", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
|
||||
};
|
||||
private StoreScanner getTestScanner(Scan s, NavigableSet<byte[]> cols) {
|
||||
KeyValueScanner [] scanners = new KeyValueScanner[] {
|
||||
new KeyValueScanFixture(KeyValue.COMPARATOR, stdKvs)
|
||||
};
|
||||
|
||||
return new StoreScanner(s, CF, Long.MAX_VALUE, KeyValue.COMPARATOR, cols,
|
||||
scanners);
|
||||
}
|
||||
|
||||
|
||||
// Test new and old row prefix filters.
|
||||
public void testNewRowPrefixFilter() throws IOException {
|
||||
Filter f = new WhileMatchFilter(
|
||||
new PrefixFilter(Bytes.toBytes("R:")));
|
||||
Scan s = new Scan(Bytes.toBytes("R:7"));
|
||||
s.setFilter(f);
|
||||
|
||||
rowPrefixFilter(s);
|
||||
}
|
||||
|
||||
public void testOldRowPrefixFilter() throws IOException {
|
||||
RowFilterInterface f = new WhileMatchRowFilter(
|
||||
new PrefixRowFilter(Bytes.toBytes("R:")));
|
||||
Scan s = new Scan(Bytes.toBytes("R:7"));
|
||||
s.setOldFilter(f);
|
||||
|
||||
rowPrefixFilter(s);
|
||||
|
||||
}
|
||||
public void rowPrefixFilter(Scan s) throws IOException {
|
||||
|
||||
StoreScanner scan = getTestScanner(s, null);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertTrue(scan.next(results));
|
||||
assertEquals(2, results.size());
|
||||
assertEquals(stdKvs[22], results.get(0));
|
||||
assertEquals(stdKvs[23], results.get(1));
|
||||
results.clear();
|
||||
|
||||
assertTrue(scan.next(results));
|
||||
assertEquals(2, results.size());
|
||||
assertEquals(stdKvs[24], results.get(0));
|
||||
assertEquals(stdKvs[25], results.get(1));
|
||||
results.clear();
|
||||
|
||||
assertFalse(scan.next(results));
|
||||
assertEquals(0, results.size());
|
||||
}
|
||||
|
||||
// Test new and old row-inclusive stop filter.
|
||||
public void testNewRowInclusiveStopFilter() throws IOException {
|
||||
Filter f = new WhileMatchFilter(new InclusiveStopFilter(Bytes.toBytes("R:3")));
|
||||
Scan scan = new Scan();
|
||||
scan.setFilter(f);
|
||||
|
||||
rowInclusiveStopFilter(scan);
|
||||
}
|
||||
|
||||
public void testOldRowInclusiveTopFilter() throws IOException {
|
||||
RowFilterInterface f = new WhileMatchRowFilter(
|
||||
new InclusiveStopRowFilter(Bytes.toBytes("R:3")));
|
||||
Scan scan = new Scan();
|
||||
scan.setOldFilter(f);
|
||||
|
||||
rowInclusiveStopFilter(scan);
|
||||
}
|
||||
|
||||
public void rowInclusiveStopFilter(Scan scan) throws IOException {
|
||||
StoreScanner s = getTestScanner(scan, getCols("a"));
|
||||
|
||||
// read crap.
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertTrue(s.next(results));
|
||||
assertEquals(1, results.size());
|
||||
assertEquals(stdKvs[0], results.get(0));
|
||||
results.clear();
|
||||
|
||||
assertTrue(s.next(results));
|
||||
assertEquals(1, results.size());
|
||||
assertEquals(stdKvs[9], results.get(0));
|
||||
results.clear();
|
||||
|
||||
assertTrue(s.next(results));
|
||||
assertEquals(1, results.size());
|
||||
assertEquals(stdKvs[12], results.get(0));
|
||||
results.clear();
|
||||
|
||||
// without aggressive peeking, the scanner doesnt know if the next row is good or not
|
||||
// under the affects of a filter.
|
||||
assertFalse(s.next(results));
|
||||
assertEquals(0, results.size());
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue