HBASE-9079 FilterList getNextKeyHint skips rows that should be included in the results (Viral Bajaria and LarsH)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1512018 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d28f8564ad
commit
c8530f81d4
|
@ -48,7 +48,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
|
|||
*
|
||||
* <br/>
|
||||
* Defaults to {@link Operator#MUST_PASS_ALL}.
|
||||
* <p>TODO: Fix creation of Configuration on serialization and deserialization.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
|
@ -64,6 +63,7 @@ public class FilterList extends Filter {
|
|||
private static final int MAX_LOG_FILTERS = 5;
|
||||
private Operator operator = Operator.MUST_PASS_ALL;
|
||||
private List<Filter> filters = new ArrayList<Filter>();
|
||||
private Filter seekHintFilter = null;
|
||||
|
||||
/** Reference KeyValue used by {@link #transform(KeyValue)} for validation purpose. */
|
||||
private KeyValue referenceKV = null;
|
||||
|
@ -163,6 +163,7 @@ public class FilterList extends Filter {
|
|||
for (Filter filter : filters) {
|
||||
filter.reset();
|
||||
}
|
||||
seekHintFilter = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -232,6 +233,9 @@ public class FilterList extends Filter {
|
|||
case INCLUDE:
|
||||
transformed = filter.transform(transformed);
|
||||
continue;
|
||||
case SEEK_NEXT_USING_HINT:
|
||||
seekHintFilter = filter;
|
||||
return code;
|
||||
default:
|
||||
return code;
|
||||
}
|
||||
|
@ -349,22 +353,28 @@ public class FilterList extends Filter {
|
|||
* @return true if and only if the fields of the filter that are serialized
|
||||
* are equal to the corresponding fields in other. Used for testing.
|
||||
*/
|
||||
boolean areSerializedFieldsEqual(Filter o) {
|
||||
if (o == this) return true;
|
||||
if (!(o instanceof FilterList)) return false;
|
||||
boolean areSerializedFieldsEqual(Filter other) {
|
||||
if (other == this) return true;
|
||||
if (!(other instanceof FilterList)) return false;
|
||||
|
||||
FilterList other = (FilterList)o;
|
||||
return this.getOperator().equals(other.getOperator()) &&
|
||||
((this.getFilters() == other.getFilters())
|
||||
|| this.getFilters().equals(other.getFilters()));
|
||||
FilterList o = (FilterList)other;
|
||||
return this.getOperator().equals(o.getOperator()) &&
|
||||
((this.getFilters() == o.getFilters())
|
||||
|| this.getFilters().equals(o.getFilters()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
|
||||
KeyValue keyHint = null;
|
||||
if (operator == Operator.MUST_PASS_ALL) {
|
||||
keyHint = seekHintFilter.getNextKeyHint(currentKV);
|
||||
return keyHint;
|
||||
}
|
||||
|
||||
// If any condition can pass, we need to keep the min hint
|
||||
for (Filter filter : filters) {
|
||||
KeyValue curKeyHint = filter.getNextKeyHint(currentKV);
|
||||
if (curKeyHint == null && operator == Operator.MUST_PASS_ONE) {
|
||||
if (curKeyHint == null) {
|
||||
// If we ever don't have a hint and this is must-pass-one, then no hint
|
||||
return null;
|
||||
}
|
||||
|
@ -374,14 +384,7 @@ public class FilterList extends Filter {
|
|||
keyHint = curKeyHint;
|
||||
continue;
|
||||
}
|
||||
// There is an existing hint
|
||||
if (operator == Operator.MUST_PASS_ALL &&
|
||||
KeyValue.COMPARATOR.compare(keyHint, curKeyHint) < 0) {
|
||||
// If all conditions must pass, we can keep the max hint
|
||||
keyHint = curKeyHint;
|
||||
} else if (operator == Operator.MUST_PASS_ONE &&
|
||||
KeyValue.COMPARATOR.compare(keyHint, curKeyHint) > 0) {
|
||||
// If any condition can pass, we need to keep the min hint
|
||||
if (KeyValue.COMPARATOR.compare(keyHint, curKeyHint) > 0) {
|
||||
keyHint = curKeyHint;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -351,6 +351,11 @@ public class TestFilterList {
|
|||
};
|
||||
|
||||
Filter filterMinHint = new FilterBase() {
|
||||
@Override
|
||||
public ReturnCode filterKeyValue(KeyValue ignored) {
|
||||
return ReturnCode.SEEK_NEXT_USING_HINT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValue getNextKeyHint(KeyValue currentKV) {
|
||||
return minKeyValue;
|
||||
|
@ -361,6 +366,11 @@ public class TestFilterList {
|
|||
};
|
||||
|
||||
Filter filterMaxHint = new FilterBase() {
|
||||
@Override
|
||||
public ReturnCode filterKeyValue(KeyValue ignored) {
|
||||
return ReturnCode.SEEK_NEXT_USING_HINT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValue getNextKeyHint(KeyValue currentKV) {
|
||||
return new KeyValue(Bytes.toBytes(Long.MAX_VALUE), null, null);
|
||||
|
@ -395,30 +405,34 @@ public class TestFilterList {
|
|||
|
||||
// MUST PASS ALL
|
||||
|
||||
// Should take the max if given two hints
|
||||
// Should take the first hint
|
||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||
Arrays.asList(new Filter [] { filterMinHint, filterMaxHint } ));
|
||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextKeyHint(null),
|
||||
maxKeyValue));
|
||||
|
||||
// Should have max hint even if a filter has no hint
|
||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||
Arrays.asList(
|
||||
new Filter [] { filterMinHint, filterMaxHint, filterNoHint } ));
|
||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextKeyHint(null),
|
||||
maxKeyValue));
|
||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||
Arrays.asList(new Filter [] { filterNoHint, filterMaxHint } ));
|
||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextKeyHint(null),
|
||||
maxKeyValue));
|
||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||
Arrays.asList(new Filter [] { filterNoHint, filterMinHint } ));
|
||||
filterList.filterKeyValue(null);
|
||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextKeyHint(null),
|
||||
minKeyValue));
|
||||
|
||||
// Should give min hint if its the only one
|
||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||
Arrays.asList(new Filter [] { filterMaxHint, filterMinHint } ));
|
||||
filterList.filterKeyValue(null);
|
||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextKeyHint(null),
|
||||
maxKeyValue));
|
||||
|
||||
// Should have first hint even if a filter has no hint
|
||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||
Arrays.asList(
|
||||
new Filter [] { filterNoHint, filterMinHint, filterMaxHint } ));
|
||||
filterList.filterKeyValue(null);
|
||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextKeyHint(null),
|
||||
minKeyValue));
|
||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||
Arrays.asList(new Filter [] { filterNoHint, filterMaxHint } ));
|
||||
filterList.filterKeyValue(null);
|
||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextKeyHint(null),
|
||||
maxKeyValue));
|
||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||
Arrays.asList(new Filter [] { filterNoHint, filterMinHint } ));
|
||||
filterList.filterKeyValue(null);
|
||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextKeyHint(null),
|
||||
minKeyValue));
|
||||
}
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
/**
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.filter;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueTestUtil;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.junit.*;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestFuzzyRowAndColumnRangeFilter {
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void Test() throws Exception {
|
||||
String cf = "f";
|
||||
String table = "TestFuzzyAndColumnRangeFilterClient";
|
||||
HTable ht = TEST_UTIL.createTable(Bytes.toBytes(table),
|
||||
Bytes.toBytes(cf), Integer.MAX_VALUE);
|
||||
|
||||
// 10 byte row key - (2 bytes 4 bytes 4 bytes)
|
||||
// 4 byte qualifier
|
||||
// 4 byte value
|
||||
|
||||
for (int i1 = 0; i1 < 2; i1++) {
|
||||
for (int i2 = 0; i2 < 5; i2++) {
|
||||
byte[] rk = new byte[10];
|
||||
|
||||
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(rk);
|
||||
buf.clear();
|
||||
buf.writeShort((short) 2);
|
||||
buf.writeInt(i1);
|
||||
buf.writeInt(i2);
|
||||
|
||||
for (int c = 0; c < 5; c++) {
|
||||
byte[] cq = new byte[4];
|
||||
Bytes.putBytes(cq, 0, Bytes.toBytes(c), 0, 4);
|
||||
|
||||
Put p = new Put(rk);
|
||||
p.setDurability(Durability.SKIP_WAL);
|
||||
p.add(cf.getBytes(), cq, Bytes.toBytes(c));
|
||||
ht.put(p);
|
||||
LOG.info("Inserting: rk: " + Bytes.toStringBinary(rk) + " cq: "
|
||||
+ Bytes.toStringBinary(cq));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_UTIL.flush();
|
||||
|
||||
// test passes
|
||||
runTest(ht, 0, 10);
|
||||
|
||||
// test fails
|
||||
runTest(ht, 1, 8);
|
||||
}
|
||||
|
||||
private void runTest(HTable hTable, int cqStart, int expectedSize) throws IOException {
|
||||
// [0, 2, ?, ?, ?, ?, 0, 0, 0, 1]
|
||||
byte[] fuzzyKey = new byte[10];
|
||||
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(fuzzyKey);
|
||||
buf.clear();
|
||||
buf.writeShort((short) 2);
|
||||
for (int i = 0; i < 4; i++)
|
||||
buf.writeByte((short)63);
|
||||
buf.writeInt((short)1);
|
||||
|
||||
byte[] mask = new byte[] {0 , 0, 1, 1, 1, 1, 0, 0, 0, 0};
|
||||
|
||||
Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask);
|
||||
FuzzyRowFilter fuzzyRowFilter = new FuzzyRowFilter(Lists.newArrayList(pair));
|
||||
ColumnRangeFilter columnRangeFilter = new ColumnRangeFilter(Bytes.toBytes(cqStart), true
|
||||
, Bytes.toBytes(4), true);
|
||||
//regular test
|
||||
runScanner(hTable, expectedSize, fuzzyRowFilter, columnRangeFilter);
|
||||
//reverse filter order test
|
||||
runScanner(hTable, expectedSize, columnRangeFilter, fuzzyRowFilter);
|
||||
}
|
||||
|
||||
private void runScanner(HTable hTable, int expectedSize, Filter... filters) throws IOException {
|
||||
String cf = "f";
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(cf.getBytes());
|
||||
FilterList filterList = new FilterList(filters);
|
||||
scan.setFilter(filterList);
|
||||
|
||||
ResultScanner scanner = hTable.getScanner(scan);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
Result result;
|
||||
long timeBeforeScan = System.currentTimeMillis();
|
||||
while ((result = scanner.next()) != null) {
|
||||
for (KeyValue kv : result.list()) {
|
||||
LOG.info("Got rk: " + Bytes.toStringBinary(kv.getRow()) + " cq: "
|
||||
+ Bytes.toStringBinary(kv.getQualifier()));
|
||||
results.add(kv);
|
||||
}
|
||||
}
|
||||
long scanTime = System.currentTimeMillis() - timeBeforeScan;
|
||||
scanner.close();
|
||||
|
||||
LOG.info("scan time = " + scanTime + "ms");
|
||||
LOG.info("found " + results.size() + " results");
|
||||
|
||||
assertEquals(expectedSize, results.size());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue