HBASE-14269 FuzzyRowFilter omits certain rows when multiple fuzzy keys exist (hongbin ma)

This commit is contained in:
tedyu 2015-08-26 07:23:43 -07:00
parent 506726ed28
commit 6661f2d025
2 changed files with 125 additions and 85 deletions

View File

@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.filter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.PriorityQueue;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
@ -160,82 +160,82 @@ public class FuzzyRowFilter extends FilterBase {
@Override @Override
public Cell getNextCellHint(Cell currentCell) { public Cell getNextCellHint(Cell currentCell) {
boolean result = true; boolean result = tracker.updateTracker(currentCell);
if (tracker.needsUpdate()) {
result = tracker.updateTracker(currentCell);
}
if (result == false) { if (result == false) {
done = true; done = true;
return null; return null;
} }
byte[] nextRowKey = tracker.nextRow(); byte[] nextRowKey = tracker.nextRow();
// We need to compare nextRowKey with currentCell
int compareResult = CellComparator.COMPARATOR.compareRows(currentCell, nextRowKey, 0,
nextRowKey.length);
if ((reversed && compareResult < 0) || (!reversed && compareResult > 0)) {
// This can happen when we have multilpe filters and some other filter
// returns next row with hint which is larger (smaller for reverse)
// than the current (really?)
result = tracker.updateTracker(currentCell);
if (result == false) {
done = true;
return null;
} else {
nextRowKey = tracker.nextRow();
}
}
return KeyValueUtil.createFirstOnRow(nextRowKey); return KeyValueUtil.createFirstOnRow(nextRowKey);
} }
/** /**
* If we have multiple fuzzy keys, row tracker should improve overall performance It calculates * If we have multiple fuzzy keys, row tracker should improve overall performance. It calculates
* all next rows (one per every fuzzy key), sort them accordingly (ascending for regular and * all next rows (one per every fuzzy key) and put them (the fuzzy key is bundled) into a priority
* descending for reverse). Next time getNextCellHint is called we check row tracker first and * queue so that the smallest row key always appears at queue head, which helps to decide the
* return next row from the tracker if it exists, if there are no rows in the tracker we update * "Next Cell Hint". As scanning going on, the number of candidate rows in the RowTracker will
* tracker with a current cell and return first row. * remain the size of fuzzy keys until some of the fuzzy keys won't possibly have matches any
* more.
*/ */
private class RowTracker { private class RowTracker {
private final List<byte[]> nextRows; private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> nextRows;
private int next = -1; private boolean initialized = false;
RowTracker() { RowTracker() {
nextRows = new ArrayList<byte[]>(); nextRows =
} new PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>>(fuzzyKeysData.size(),
new Comparator<Pair<byte[], Pair<byte[], byte[]>>>() {
boolean needsUpdate() { @Override
return next == -1 || next == nextRows.size(); public int compare(Pair<byte[], Pair<byte[], byte[]>> o1,
Pair<byte[], Pair<byte[], byte[]>> o2) {
int compare = Bytes.compareTo(o1.getFirst(), o2.getFirst());
if (!isReversed()) {
return compare;
} else {
return -compare;
}
}
});
} }
byte[] nextRow() { byte[] nextRow() {
if (next < 0 || next == nextRows.size()) return null; if (nextRows.isEmpty()) {
return nextRows.get(next++); throw new IllegalStateException(
"NextRows should not be empty, make sure to call nextRow() after updateTracker() return true");
} else {
return nextRows.peek().getFirst();
}
} }
boolean updateTracker(Cell currentCell) { boolean updateTracker(Cell currentCell) {
nextRows.clear(); if (!initialized) {
for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) { for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
byte[] nextRowKeyCandidate = updateWith(currentCell, fuzzyData);
getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), }
currentCell.getRowOffset(), currentCell.getRowLength(), fuzzyData.getFirst(), initialized = true;
fuzzyData.getSecond()); } else {
if (nextRowKeyCandidate == null) { while (!nextRows.isEmpty() && !lessThan(currentCell, nextRows.peek().getFirst())) {
continue; Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll();
Pair<byte[], byte[]> fuzzyData = head.getSecond();
updateWith(currentCell, fuzzyData);
} }
nextRows.add(nextRowKeyCandidate);
} }
// Sort all next row candidates return !nextRows.isEmpty();
Collections.sort(nextRows, new Comparator<byte[]>() { }
@Override
public int compare(byte[] o1, byte[] o2) { boolean lessThan(Cell currentCell, byte[] nextRowKey) {
if (reversed) { int compareResult =
return -Bytes.compareTo(o1, o2); CellComparator.COMPARATOR.compareRows(currentCell, nextRowKey, 0, nextRowKey.length);
} else { return (!isReversed() && compareResult < 0) || (isReversed() && compareResult > 0);
return Bytes.compareTo(o1, o2); }
}
} void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
}); byte[] nextRowKeyCandidate =
next = 0; getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(),
return nextRows.size() > 0; currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
if (nextRowKeyCandidate != null) {
nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData));
}
} }
} }
@ -382,8 +382,8 @@ public class FuzzyRowFilter extends FilterBase {
return SatisfiesCode.YES; return SatisfiesCode.YES;
} }
static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, int length,
int length, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
if (row == null) { if (row == null) {
// do nothing, let scan to proceed // do nothing, let scan to proceed
return SatisfiesCode.YES; return SatisfiesCode.YES;

View File

@ -60,12 +60,14 @@ import com.google.common.collect.Lists;
@Category({ FilterTests.class, MediumTests.class }) @Category({ FilterTests.class, MediumTests.class })
public class TestFuzzyRowFilterEndToEnd { public class TestFuzzyRowFilterEndToEnd {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte fuzzyValue = (byte) 63;
private static final Log LOG = LogFactory.getLog(TestFuzzyRowFilterEndToEnd.class); private static final Log LOG = LogFactory.getLog(TestFuzzyRowFilterEndToEnd.class);
private static int firstPartCardinality = 50; private static int firstPartCardinality = 50;
private static int secondPartCardinality = 40; private static int secondPartCardinality = 50;
private static int colQualifiersTotal = 50; private static int thirdPartCardinality = 50;
private static int totalFuzzyKeys = secondPartCardinality / 2; private static int colQualifiersTotal = 5;
private static int totalFuzzyKeys = thirdPartCardinality / 2;
private static String table = "TestFuzzyRowFilterEndToEnd"; private static String table = "TestFuzzyRowFilterEndToEnd";
@ -119,25 +121,27 @@ public class TestFuzzyRowFilterEndToEnd {
// 4 byte qualifier // 4 byte qualifier
// 4 byte value // 4 byte value
for (int i1 = 0; i1 < firstPartCardinality; i1++) { for (int i0 = 0; i0 < firstPartCardinality; i0++) {
if ((i1 % 1000) == 0) LOG.info("put " + i1);
for (int i2 = 0; i2 < secondPartCardinality; i2++) { for (int i1 = 0; i1 < secondPartCardinality; i1++) {
byte[] rk = new byte[10];
ByteBuffer buf = ByteBuffer.wrap(rk); for (int i2 = 0; i2 < thirdPartCardinality; i2++) {
buf.clear(); byte[] rk = new byte[10];
buf.putShort((short) 2);
buf.putInt(i1);
buf.putInt(i2);
for (int c = 0; c < colQualifiersTotal; c++) {
byte[] cq = new byte[4];
Bytes.putBytes(cq, 0, Bytes.toBytes(c), 0, 4);
Put p = new Put(rk); ByteBuffer buf = ByteBuffer.wrap(rk);
p.setDurability(Durability.SKIP_WAL); buf.clear();
p.add(cf.getBytes(), cq, Bytes.toBytes(c)); buf.putShort((short) i0);
ht.put(p); buf.putInt(i1);
buf.putInt(i2);
for (int c = 0; c < colQualifiersTotal; c++) {
byte[] cq = new byte[4];
Bytes.putBytes(cq, 0, Bytes.toBytes(c), 0, 4);
Put p = new Put(rk);
p.setDurability(Durability.SKIP_WAL);
p.add(cf.getBytes(), cq, Bytes.toBytes(c));
ht.put(p);
}
} }
} }
} }
@ -145,11 +149,12 @@ public class TestFuzzyRowFilterEndToEnd {
TEST_UTIL.flush(); TEST_UTIL.flush();
// test passes // test passes
runTest(ht); runTest1(ht);
runTest2(ht);
} }
private void runTest(Table hTable) throws IOException { private void runTest1(Table hTable) throws IOException {
// [0, 2, ?, ?, ?, ?, 0, 0, 0, 1] // [0, 2, ?, ?, ?, ?, 0, 0, 0, 1]
byte[] mask = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 }; byte[] mask = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 };
@ -161,7 +166,7 @@ public class TestFuzzyRowFilterEndToEnd {
buf.clear(); buf.clear();
buf.putShort((short) 2); buf.putShort((short) 2);
for (int j = 0; j < 4; j++) { for (int j = 0; j < 4; j++) {
buf.put((byte) 63); buf.put(fuzzyValue);
} }
buf.putInt(i); buf.putInt(i);
@ -169,7 +174,41 @@ public class TestFuzzyRowFilterEndToEnd {
list.add(pair); list.add(pair);
} }
int expectedSize = firstPartCardinality * totalFuzzyKeys * colQualifiersTotal; int expectedSize = secondPartCardinality * totalFuzzyKeys * colQualifiersTotal;
FuzzyRowFilter fuzzyRowFilter0 = new FuzzyRowFilter(list);
// Filters are not stateless - we can't reuse them
FuzzyRowFilter fuzzyRowFilter1 = new FuzzyRowFilter(list);
// regular test
runScanner(hTable, expectedSize, fuzzyRowFilter0);
// optimized from block cache
runScanner(hTable, expectedSize, fuzzyRowFilter1);
}
private void runTest2(Table hTable) throws IOException {
// [0, 0, ?, ?, ?, ?, 0, 0, 0, 0] , [0, 1, ?, ?, ?, ?, 0, 0, 0, 1]...
byte[] mask = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 };
List<Pair<byte[], byte[]>> list = new ArrayList<Pair<byte[], byte[]>>();
for (int i = 0; i < totalFuzzyKeys; i++) {
byte[] fuzzyKey = new byte[10];
ByteBuffer buf = ByteBuffer.wrap(fuzzyKey);
buf.clear();
buf.putShort((short) (i * 2));
for (int j = 0; j < 4; j++) {
buf.put(fuzzyValue);
}
buf.putInt(i * 2);
Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask);
list.add(pair);
}
int expectedSize = totalFuzzyKeys * secondPartCardinality * colQualifiersTotal;
FuzzyRowFilter fuzzyRowFilter0 = new FuzzyRowFilter(list); FuzzyRowFilter fuzzyRowFilter0 = new FuzzyRowFilter(list);
// Filters are not stateless - we can't reuse them // Filters are not stateless - we can't reuse them
FuzzyRowFilter fuzzyRowFilter1 = new FuzzyRowFilter(list); FuzzyRowFilter fuzzyRowFilter1 = new FuzzyRowFilter(list);
@ -208,7 +247,7 @@ public class TestFuzzyRowFilterEndToEnd {
assertEquals(expectedSize, found); assertEquals(expectedSize, found);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Test @Test
public void testFilterList() throws Exception { public void testFilterList() throws Exception {
@ -261,7 +300,7 @@ public class TestFuzzyRowFilterEndToEnd {
buf.clear(); buf.clear();
buf.putShort((short) 2); buf.putShort((short) 2);
for (int i = 0; i < 4; i++) for (int i = 0; i < 4; i++)
buf.put((byte) 63); buf.put(fuzzyValue);
buf.putInt((short) 1); buf.putInt((short) 1);
byte[] mask1 = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 }; byte[] mask1 = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 };
@ -271,7 +310,7 @@ public class TestFuzzyRowFilterEndToEnd {
buf.putShort((short) 2); buf.putShort((short) 2);
buf.putInt((short) 2); buf.putInt((short) 2);
for (int i = 0; i < 4; i++) for (int i = 0; i < 4; i++)
buf.put((byte) 63); buf.put(fuzzyValue);
byte[] mask2 = new byte[] { 0, 0, 0, 0, 0, 0, 1, 1, 1, 1 }; byte[] mask2 = new byte[] { 0, 0, 0, 0, 0, 0, 1, 1, 1, 1 };
@ -284,7 +323,8 @@ public class TestFuzzyRowFilterEndToEnd {
runScanner(hTable, expectedSize, fuzzyRowFilter1, fuzzyRowFilter2); runScanner(hTable, expectedSize, fuzzyRowFilter1, fuzzyRowFilter2);
} }
private void runScanner(Table hTable, int expectedSize, Filter filter1, Filter filter2) throws IOException { private void runScanner(Table hTable, int expectedSize, Filter filter1, Filter filter2)
throws IOException {
String cf = "f"; String cf = "f";
Scan scan = new Scan(); Scan scan = new Scan();
scan.addFamily(cf.getBytes()); scan.addFamily(cf.getBytes());