HBASE-20565 ColumnRangeFilter combined with ColumnPaginationFilter can produce incorrect result

This commit is contained in:
huzheng 2018-07-17 20:37:39 +08:00
parent 44bf7076b7
commit 1dbfe92dbf
4 changed files with 91 additions and 46 deletions

View File

@ -208,17 +208,20 @@ public class ColumnRangeFilter extends FilterBase {
}
/**
* @param other
* @return true if and only if the fields of the filter that are serialized
* are equal to the corresponding fields in other. Used for testing.
* @param o filter to serialize.
* @return true if and only if the fields of the filter that are serialized are equal to the
* corresponding fields in other. Used for testing.
*/
@Override
boolean areSerializedFieldsEqual(Filter o) {
if (o == this) return true;
if (!(o instanceof ColumnRangeFilter)) return false;
ColumnRangeFilter other = (ColumnRangeFilter)o;
return Bytes.equals(this.getMinColumn(),other.getMinColumn())
if (o == this) {
return true;
}
if (!(o instanceof ColumnRangeFilter)) {
return false;
}
ColumnRangeFilter other = (ColumnRangeFilter) o;
return Bytes.equals(this.getMinColumn(), other.getMinColumn())
&& this.getMinColumnInclusive() == other.getMinColumnInclusive()
&& Bytes.equals(this.getMaxColumn(), other.getMaxColumn())
&& this.getMaxColumnInclusive() == other.getMaxColumnInclusive();

View File

@ -154,6 +154,11 @@ public class FilterListWithAND extends FilterListBase {
"Received code is not valid. rc: " + rc + ", localRC: " + localRC);
}
private boolean isIncludeRelatedReturnCode(ReturnCode rc) {
return isInReturnCodes(rc, ReturnCode.INCLUDE, ReturnCode.INCLUDE_AND_NEXT_COL,
ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW);
}
@Override
public ReturnCode filterCell(Cell c) throws IOException {
if (isEmpty()) {
@ -168,11 +173,16 @@ public class FilterListWithAND extends FilterListBase {
}
ReturnCode localRC;
localRC = filter.filterCell(c);
rc = mergeReturnCode(rc, localRC);
if (localRC == ReturnCode.SEEK_NEXT_USING_HINT) {
seekHintFilters.add(filter);
}
rc = mergeReturnCode(rc, localRC);
// Only when rc is INCLUDE* case, we should pass the cell to the following sub-filters.
// otherwise we may mess up the global state (such as offset, count..) in the following
// sub-filters. (HBASE-20565)
if (!isIncludeRelatedReturnCode(rc)) {
return rc;
}
}
if (!seekHintFilters.isEmpty()) {
return ReturnCode.SEEK_NEXT_USING_HINT;

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.filter;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.testclassification.FilterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -226,40 +228,70 @@ public class TestColumnRangeFilter {
Scan scan = new Scan();
scan.setMaxVersions();
for (StringRange s : rangeMap.keySet()) {
filter = new ColumnRangeFilter(s.getStart() == null ? null
: Bytes.toBytes(s.getStart()), s.isStartInclusive(),
s.getEnd() == null ? null : Bytes.toBytes(s.getEnd()),
filter = new ColumnRangeFilter(s.getStart() == null ? null : Bytes.toBytes(s.getStart()),
s.isStartInclusive(), s.getEnd() == null ? null : Bytes.toBytes(s.getEnd()),
s.isEndInclusive());
scan.setFilter(filter);
ResultScanner scanner = ht.getScanner(scan);
List<Cell> results = new ArrayList<>();
LOG.info("scan column range: " + s.toString());
long timeBeforeScan = System.currentTimeMillis();
Result result;
while ((result = scanner.next()) != null) {
for (Cell kv : result.listCells()) {
results.add(kv);
}
}
long scanTime = System.currentTimeMillis() - timeBeforeScan;
scanner.close();
LOG.info("scan time = " + scanTime + "ms");
LOG.info("found " + results.size() + " results");
LOG.info("Expecting " + rangeMap.get(s).size() + " results");
/*
for (KeyValue kv : results) {
LOG.info("found row " + Bytes.toString(kv.getRow()) + ", column "
+ Bytes.toString(kv.getQualifier()));
}
*/
assertEquals(rangeMap.get(s).size(), results.size());
assertEquals(rangeMap.get(s).size(), cellsCount(ht, filter));
}
ht.close();
}
@Test
public void TestColumnRangeFilterWithColumnPaginationFilter() throws Exception {
String family = "Family";
String table = "TestColumnRangeFilterWithColumnPaginationFilter";
try (Table ht =
TEST_UTIL.createTable(TableName.valueOf(table), Bytes.toBytes(family), Integer.MAX_VALUE)) {
// one row.
String row = "row";
// One version
long timestamp = 100;
// 10 columns
int[] columns = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
String valueString = "ValueString";
Put p = new Put(Bytes.toBytes(row));
p.setDurability(Durability.SKIP_WAL);
for (int column : columns) {
KeyValue kv =
KeyValueTestUtil.create(row, family, Integer.toString(column), timestamp, valueString);
p.add(kv);
}
ht.put(p);
TEST_UTIL.flush();
// Column range from 1 to 9.
StringRange stringRange = new StringRange("1", true, "9", false);
ColumnRangeFilter filter1 = new ColumnRangeFilter(Bytes.toBytes(stringRange.getStart()),
stringRange.isStartInclusive(), Bytes.toBytes(stringRange.getEnd()),
stringRange.isEndInclusive());
ColumnPaginationFilter filter2 = new ColumnPaginationFilter(5, 0);
ColumnPaginationFilter filter3 = new ColumnPaginationFilter(5, 1);
ColumnPaginationFilter filter4 = new ColumnPaginationFilter(5, 2);
ColumnPaginationFilter filter5 = new ColumnPaginationFilter(5, 6);
ColumnPaginationFilter filter6 = new ColumnPaginationFilter(5, 9);
assertEquals(5, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter2)));
assertEquals(5, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter3)));
assertEquals(5, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter4)));
assertEquals(2, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter5)));
assertEquals(0, cellsCount(ht, new FilterList(Operator.MUST_PASS_ALL, filter1, filter6)));
}
}
private int cellsCount(Table table, Filter filter) throws IOException {
Scan scan = new Scan().setFilter(filter).readAllVersions();
try (ResultScanner scanner = table.getScanner(scan)) {
List<Cell> results = new ArrayList<>();
Result result;
while ((result = scanner.next()) != null) {
result.listCells().forEach(results::add);
}
return results.size();
}
}
List<String> generateRandomWords(int numberOfWords, int maxLengthOfWords) {
Set<String> wordSet = new HashSet<>();
for (int i = 0; i < numberOfWords; i++) {

View File

@ -525,7 +525,7 @@ public class TestFilterList {
filterList = new FilterList(Operator.MUST_PASS_ALL,
Arrays.asList(new Filter [] { filterMinHint, filterMaxHint } ));
filterList.filterCell(null);
assertEquals(0, comparator.compare(filterList.getNextCellHint(null), maxKeyValue));
assertEquals(0, comparator.compare(filterList.getNextCellHint(null), minKeyValue));
filterList = new FilterList(Operator.MUST_PASS_ALL,
Arrays.asList(new Filter [] { filterMaxHint, filterMinHint } ));
@ -536,7 +536,7 @@ public class TestFilterList {
filterList = new FilterList(Operator.MUST_PASS_ALL,
Arrays.asList(new Filter[] { filterNoHint, filterMinHint, filterMaxHint }));
filterList.filterCell(null);
assertEquals(0, comparator.compare(filterList.getNextCellHint(null), maxKeyValue));
assertEquals(0, comparator.compare(filterList.getNextCellHint(null), minKeyValue));
filterList = new FilterList(Operator.MUST_PASS_ALL,
Arrays.asList(new Filter[] { filterNoHint, filterMaxHint }));
filterList.filterCell(null);
@ -744,10 +744,10 @@ public class TestFilterList {
assertEquals(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, filterList.filterCell(kv1));
filterList = new FilterList(Operator.MUST_PASS_ALL, filter4, filter5, filter6);
assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, filterList.filterCell(kv1));
assertEquals(ReturnCode.NEXT_COL, filterList.filterCell(kv1));
filterList = new FilterList(Operator.MUST_PASS_ALL, filter4, filter6);
assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, filterList.filterCell(kv1));
assertEquals(ReturnCode.NEXT_COL, filterList.filterCell(kv1));
filterList = new FilterList(Operator.MUST_PASS_ALL, filter3, filter1);
assertEquals(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, filterList.filterCell(kv1));