HBASE-20565 ColumnRangeFilter combined with ColumnPaginationFilter can produce incorrect result
This commit is contained in:
parent
3a97976eae
commit
0b7081e67a
|
@ -208,17 +208,20 @@ public class ColumnRangeFilter extends FilterBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param other
|
* @param o filter to serialize.
|
||||||
* @return true if and only if the fields of the filter that are serialized
|
* @return true if and only if the fields of the filter that are serialized are equal to the
|
||||||
* are equal to the corresponding fields in other. Used for testing.
|
* corresponding fields in other. Used for testing.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
boolean areSerializedFieldsEqual(Filter o) {
|
boolean areSerializedFieldsEqual(Filter o) {
|
||||||
if (o == this) return true;
|
if (o == this) {
|
||||||
if (!(o instanceof ColumnRangeFilter)) return false;
|
return true;
|
||||||
|
}
|
||||||
ColumnRangeFilter other = (ColumnRangeFilter)o;
|
if (!(o instanceof ColumnRangeFilter)) {
|
||||||
return Bytes.equals(this.getMinColumn(),other.getMinColumn())
|
return false;
|
||||||
|
}
|
||||||
|
ColumnRangeFilter other = (ColumnRangeFilter) o;
|
||||||
|
return Bytes.equals(this.getMinColumn(), other.getMinColumn())
|
||||||
&& this.getMinColumnInclusive() == other.getMinColumnInclusive()
|
&& this.getMinColumnInclusive() == other.getMinColumnInclusive()
|
||||||
&& Bytes.equals(this.getMaxColumn(), other.getMaxColumn())
|
&& Bytes.equals(this.getMaxColumn(), other.getMaxColumn())
|
||||||
&& this.getMaxColumnInclusive() == other.getMaxColumnInclusive();
|
&& this.getMaxColumnInclusive() == other.getMaxColumnInclusive();
|
||||||
|
|
|
@ -154,6 +154,11 @@ public class FilterListWithAND extends FilterListBase {
|
||||||
"Received code is not valid. rc: " + rc + ", localRC: " + localRC);
|
"Received code is not valid. rc: " + rc + ", localRC: " + localRC);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isIncludeRelatedReturnCode(ReturnCode rc) {
|
||||||
|
return isInReturnCodes(rc, ReturnCode.INCLUDE, ReturnCode.INCLUDE_AND_NEXT_COL,
|
||||||
|
ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReturnCode filterKeyValue(Cell c) throws IOException {
|
public ReturnCode filterKeyValue(Cell c) throws IOException {
|
||||||
if (isEmpty()) {
|
if (isEmpty()) {
|
||||||
|
@ -167,10 +172,16 @@ public class FilterListWithAND extends FilterListBase {
|
||||||
return ReturnCode.NEXT_ROW;
|
return ReturnCode.NEXT_ROW;
|
||||||
}
|
}
|
||||||
ReturnCode localRC = filter.filterKeyValue(c);
|
ReturnCode localRC = filter.filterKeyValue(c);
|
||||||
rc = mergeReturnCode(rc, localRC);
|
|
||||||
if (localRC == ReturnCode.SEEK_NEXT_USING_HINT) {
|
if (localRC == ReturnCode.SEEK_NEXT_USING_HINT) {
|
||||||
seekHintFilters.add(filter);
|
seekHintFilters.add(filter);
|
||||||
}
|
}
|
||||||
|
rc = mergeReturnCode(rc, localRC);
|
||||||
|
// Only when rc is INCLUDE* case, we should pass the cell to the following sub-filters.
|
||||||
|
// otherwise we may mess up the global state (such as offset, count..) in the following
|
||||||
|
// sub-filters. (HBASE-20565)
|
||||||
|
if (!isIncludeRelatedReturnCode(rc)) {
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (!seekHintFilters.isEmpty()) {
|
if (!seekHintFilters.isEmpty()) {
|
||||||
return ReturnCode.SEEK_NEXT_USING_HINT;
|
return ReturnCode.SEEK_NEXT_USING_HINT;
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.filter;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.filter.FilterList.Operator;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -159,8 +161,8 @@ public class TestColumnRangeFilter {
|
||||||
public void TestColumnRangeFilterClient() throws Exception {
|
public void TestColumnRangeFilterClient() throws Exception {
|
||||||
String family = "Family";
|
String family = "Family";
|
||||||
String table = "TestColumnRangeFilterClient";
|
String table = "TestColumnRangeFilterClient";
|
||||||
Table ht = TEST_UTIL.createTable(TableName.valueOf(table),
|
Table ht =
|
||||||
Bytes.toBytes(family), Integer.MAX_VALUE);
|
TEST_UTIL.createTable(TableName.valueOf(table), Bytes.toBytes(family), Integer.MAX_VALUE);
|
||||||
|
|
||||||
List<String> rows = generateRandomWords(10, 8);
|
List<String> rows = generateRandomWords(10, 8);
|
||||||
long maxTimestamp = 2;
|
long maxTimestamp = 2;
|
||||||
|
@ -170,14 +172,10 @@ public class TestColumnRangeFilter {
|
||||||
|
|
||||||
Map<StringRange, List<KeyValue>> rangeMap = new HashMap<StringRange, List<KeyValue>>();
|
Map<StringRange, List<KeyValue>> rangeMap = new HashMap<StringRange, List<KeyValue>>();
|
||||||
|
|
||||||
rangeMap.put(new StringRange(null, true, "b", false),
|
rangeMap.put(new StringRange(null, true, "b", false), new ArrayList<KeyValue>());
|
||||||
new ArrayList<KeyValue>());
|
rangeMap.put(new StringRange("p", true, "q", false), new ArrayList<KeyValue>());
|
||||||
rangeMap.put(new StringRange("p", true, "q", false),
|
rangeMap.put(new StringRange("r", false, "s", true), new ArrayList<KeyValue>());
|
||||||
new ArrayList<KeyValue>());
|
rangeMap.put(new StringRange("z", false, null, false), new ArrayList<KeyValue>());
|
||||||
rangeMap.put(new StringRange("r", false, "s", true),
|
|
||||||
new ArrayList<KeyValue>());
|
|
||||||
rangeMap.put(new StringRange("z", false, null, false),
|
|
||||||
new ArrayList<KeyValue>());
|
|
||||||
String valueString = "ValueString";
|
String valueString = "ValueString";
|
||||||
|
|
||||||
for (String row : rows) {
|
for (String row : rows) {
|
||||||
|
@ -185,8 +183,7 @@ public class TestColumnRangeFilter {
|
||||||
p.setDurability(Durability.SKIP_WAL);
|
p.setDurability(Durability.SKIP_WAL);
|
||||||
for (String column : columns) {
|
for (String column : columns) {
|
||||||
for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) {
|
for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) {
|
||||||
KeyValue kv = KeyValueTestUtil.create(row, family, column, timestamp,
|
KeyValue kv = KeyValueTestUtil.create(row, family, column, timestamp, valueString);
|
||||||
valueString);
|
|
||||||
p.add(kv);
|
p.add(kv);
|
||||||
kvList.add(kv);
|
kvList.add(kv);
|
||||||
for (StringRange s : rangeMap.keySet()) {
|
for (StringRange s : rangeMap.keySet()) {
|
||||||
|
@ -205,40 +202,71 @@ public class TestColumnRangeFilter {
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.setMaxVersions();
|
scan.setMaxVersions();
|
||||||
for (StringRange s : rangeMap.keySet()) {
|
for (StringRange s : rangeMap.keySet()) {
|
||||||
filter = new ColumnRangeFilter(s.getStart() == null ? null
|
filter = new ColumnRangeFilter(s.getStart() == null ? null : Bytes.toBytes(s.getStart()),
|
||||||
: Bytes.toBytes(s.getStart()), s.isStartInclusive(),
|
s.isStartInclusive(), s.getEnd() == null ? null : Bytes.toBytes(s.getEnd()),
|
||||||
s.getEnd() == null ? null : Bytes.toBytes(s.getEnd()),
|
|
||||||
s.isEndInclusive());
|
s.isEndInclusive());
|
||||||
scan.setFilter(filter);
|
scan.setFilter(filter);
|
||||||
ResultScanner scanner = ht.getScanner(scan);
|
assertEquals(rangeMap.get(s).size(), cellsCount(ht, filter));
|
||||||
List<Cell> results = new ArrayList<Cell>();
|
|
||||||
LOG.info("scan column range: " + s.toString());
|
|
||||||
long timeBeforeScan = System.currentTimeMillis();
|
|
||||||
|
|
||||||
Result result;
|
|
||||||
while ((result = scanner.next()) != null) {
|
|
||||||
for (Cell kv : result.listCells()) {
|
|
||||||
results.add(kv);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
long scanTime = System.currentTimeMillis() - timeBeforeScan;
|
|
||||||
scanner.close();
|
|
||||||
LOG.info("scan time = " + scanTime + "ms");
|
|
||||||
LOG.info("found " + results.size() + " results");
|
|
||||||
LOG.info("Expecting " + rangeMap.get(s).size() + " results");
|
|
||||||
|
|
||||||
/*
|
|
||||||
for (KeyValue kv : results) {
|
|
||||||
LOG.info("found row " + Bytes.toString(kv.getRow()) + ", column "
|
|
||||||
+ Bytes.toString(kv.getQualifier()));
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
assertEquals(rangeMap.get(s).size(), results.size());
|
|
||||||
}
|
}
|
||||||
ht.close();
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestColumnRangeFilterWithColumnPaginationFilter() throws Exception {
|
||||||
|
String family = "Family";
|
||||||
|
String table = "TestColumnRangeFilterWithColumnPaginationFilter";
|
||||||
|
try (Table ht =
|
||||||
|
TEST_UTIL.createTable(TableName.valueOf(table), Bytes.toBytes(family), Integer.MAX_VALUE)) {
|
||||||
|
// one row.
|
||||||
|
String row = "row";
|
||||||
|
// One version
|
||||||
|
long timestamp = 100;
|
||||||
|
// 10 columns
|
||||||
|
int[] columns = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
|
||||||
|
String valueString = "ValueString";
|
||||||
|
|
||||||
|
Put p = new Put(Bytes.toBytes(row));
|
||||||
|
p.setDurability(Durability.SKIP_WAL);
|
||||||
|
for (int column : columns) {
|
||||||
|
KeyValue kv =
|
||||||
|
KeyValueTestUtil.create(row, family, Integer.toString(column), timestamp, valueString);
|
||||||
|
p.add(kv);
|
||||||
|
}
|
||||||
|
ht.put(p);
|
||||||
|
|
||||||
|
TEST_UTIL.flush();
|
||||||
|
|
||||||
|
// Column range from 1 to 9.
|
||||||
|
StringRange stringRange = new StringRange("1", true, "9", false);
|
||||||
|
ColumnRangeFilter filter1 = new ColumnRangeFilter(Bytes.toBytes(stringRange.getStart()),
|
||||||
|
stringRange.isStartInclusive(), Bytes.toBytes(stringRange.getEnd()),
|
||||||
|
stringRange.isEndInclusive());
|
||||||
|
|
||||||
|
ColumnPaginationFilter filter2 = new ColumnPaginationFilter(5, 0);
|
||||||
|
ColumnPaginationFilter filter3 = new ColumnPaginationFilter(5, 1);
|
||||||
|
ColumnPaginationFilter filter4 = new ColumnPaginationFilter(5, 2);
|
||||||
|
ColumnPaginationFilter filter5 = new ColumnPaginationFilter(5, 6);
|
||||||
|
ColumnPaginationFilter filter6 = new ColumnPaginationFilter(5, 9);
|
||||||
|
assertEquals(5, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter2)));
|
||||||
|
assertEquals(5, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter3)));
|
||||||
|
assertEquals(5, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter4)));
|
||||||
|
assertEquals(2, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter5)));
|
||||||
|
assertEquals(0, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter6)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int cellsCount(Table table, Filter filter) throws IOException {
|
||||||
|
Scan scan = new Scan().setFilter(filter).setMaxVersions();
|
||||||
|
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||||
|
List<Cell> results = new ArrayList<>();
|
||||||
|
Result result;
|
||||||
|
while ((result = scanner.next()) != null) {
|
||||||
|
results.addAll(result.listCells());
|
||||||
|
}
|
||||||
|
return results.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
List<String> generateRandomWords(int numberOfWords, int maxLengthOfWords) {
|
List<String> generateRandomWords(int numberOfWords, int maxLengthOfWords) {
|
||||||
Set<String> wordSet = new HashSet<String>();
|
Set<String> wordSet = new HashSet<String>();
|
||||||
for (int i = 0; i < numberOfWords; i++) {
|
for (int i = 0; i < numberOfWords; i++) {
|
||||||
|
@ -253,6 +281,5 @@ public class TestColumnRangeFilter {
|
||||||
List<String> wordList = new ArrayList<String>(wordSet);
|
List<String> wordList = new ArrayList<String>(wordSet);
|
||||||
return wordList;
|
return wordList;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -508,7 +508,7 @@ public class TestFilterList {
|
||||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||||
Arrays.asList(new Filter[] { filterMinHint, filterMaxHint }));
|
Arrays.asList(new Filter[] { filterMinHint, filterMaxHint }));
|
||||||
filterList.filterKeyValue(null);
|
filterList.filterKeyValue(null);
|
||||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextCellHint(null), maxKeyValue));
|
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextCellHint(null), minKeyValue));
|
||||||
|
|
||||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||||
Arrays.asList(new Filter[] { filterMaxHint, filterMinHint }));
|
Arrays.asList(new Filter[] { filterMaxHint, filterMinHint }));
|
||||||
|
@ -519,7 +519,7 @@ public class TestFilterList {
|
||||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||||
Arrays.asList(new Filter[] { filterNoHint, filterMinHint, filterMaxHint }));
|
Arrays.asList(new Filter[] { filterNoHint, filterMinHint, filterMaxHint }));
|
||||||
filterList.filterKeyValue(null);
|
filterList.filterKeyValue(null);
|
||||||
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextCellHint(null), maxKeyValue));
|
assertEquals(0, KeyValue.COMPARATOR.compare(filterList.getNextCellHint(null), minKeyValue));
|
||||||
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
filterList = new FilterList(Operator.MUST_PASS_ALL,
|
||||||
Arrays.asList(new Filter[] { filterNoHint, filterMaxHint }));
|
Arrays.asList(new Filter[] { filterNoHint, filterMaxHint }));
|
||||||
filterList.filterKeyValue(null);
|
filterList.filterKeyValue(null);
|
||||||
|
@ -731,10 +731,10 @@ public class TestFilterList {
|
||||||
assertEquals(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, filterList.filterKeyValue(kv1));
|
assertEquals(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, filterList.filterKeyValue(kv1));
|
||||||
|
|
||||||
filterList = new FilterList(Operator.MUST_PASS_ALL, filter4, filter5, filter6);
|
filterList = new FilterList(Operator.MUST_PASS_ALL, filter4, filter5, filter6);
|
||||||
assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, filterList.filterKeyValue(kv1));
|
assertEquals(ReturnCode.NEXT_COL, filterList.filterKeyValue(kv1));
|
||||||
|
|
||||||
filterList = new FilterList(Operator.MUST_PASS_ALL, filter4, filter6);
|
filterList = new FilterList(Operator.MUST_PASS_ALL, filter4, filter6);
|
||||||
assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, filterList.filterKeyValue(kv1));
|
assertEquals(ReturnCode.NEXT_COL, filterList.filterKeyValue(kv1));
|
||||||
|
|
||||||
filterList = new FilterList(Operator.MUST_PASS_ALL, filter3, filter1);
|
filterList = new FilterList(Operator.MUST_PASS_ALL, filter3, filter1);
|
||||||
assertEquals(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, filterList.filterKeyValue(kv1));
|
assertEquals(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, filterList.filterKeyValue(kv1));
|
||||||
|
|
Loading…
Reference in New Issue