HBASE-17125 Inconsistent result when use filter to read data

This commit is contained in:
Guanghao Zhang 2017-08-10 21:03:50 +08:00
parent b3e7e31dee
commit 8197a31bbc
10 changed files with 465 additions and 74 deletions

View File

@ -267,10 +267,12 @@ public class Get extends Query
/**
* Get all available versions.
* @return this for invocation chaining
* @deprecated It is easy to misunderstand with column family's max versions, so use
* {@link #readAllVersions()} instead.
*/
@Deprecated
public Get setMaxVersions() {
this.maxVersions = Integer.MAX_VALUE;
return this;
return readAllVersions();
}
/**
@ -278,12 +280,34 @@ public class Get extends Query
* @param maxVersions maximum versions for each column
* @throws IOException if invalid number of versions
* @return this for invocation chaining
* @deprecated It is easy to misunderstand with column family's max versions, so use
* {@link #readVersions(int)} instead.
*/
@Deprecated
public Get setMaxVersions(int maxVersions) throws IOException {
if(maxVersions <= 0) {
throw new IOException("maxVersions must be positive");
return readVersions(maxVersions);
}
/**
* Get all available versions.
* @return this for invocation chaining
*/
public Get readAllVersions() {
this.maxVersions = Integer.MAX_VALUE;
return this;
}
/**
* Get up to the specified number of versions of each column.
* @param versions specified number of versions for each column
* @throws IOException if invalid number of versions
* @return this for invocation chaining
*/
public Get readVersions(int versions) throws IOException {
if (versions <= 0) {
throw new IOException("versions must be positive");
}
this.maxVersions = maxVersions;
this.maxVersions = versions;
return this;
}

View File

@ -53,9 +53,9 @@ public abstract class Query extends OperationWithAttributes {
}
/**
* Apply the specified server-side filter when performing the Query.
* Only {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)} is called AFTER all tests
* for ttl, column match, deletes and max versions have been run.
* Apply the specified server-side filter when performing the Query. Only
* {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)} is called AFTER all tests for ttl,
* column match, deletes and column family's max versions have been run.
* @param filter filter to run on the server
* @return this for invocation chaining
*/

View File

@ -593,19 +593,42 @@ public class Scan extends Query {
/**
* Get all available versions.
* @return this
* @deprecated It is easy to misunderstand with column family's max versions, so use
* {@link #readAllVersions()} instead.
*/
@Deprecated
public Scan setMaxVersions() {
this.maxVersions = Integer.MAX_VALUE;
return this;
return readAllVersions();
}
/**
* Get up to the specified number of versions of each column.
* @param maxVersions maximum versions for each column
* @return this
* @deprecated It is easy to misunderstand with column family's max versions, so use
* {@link #readVersions(int)} instead.
*/
@Deprecated
public Scan setMaxVersions(int maxVersions) {
this.maxVersions = maxVersions;
return readVersions(maxVersions);
}
/**
* Get all available versions.
* @return this
*/
public Scan readAllVersions() {
this.maxVersions = Integer.MAX_VALUE;
return this;
}
/**
* Get up to the specified number of versions of each column.
* @param versions specified number of versions for each column
* @return this
*/
public Scan readVersions(int versions) {
this.maxVersions = versions;
return this;
}

View File

@ -355,13 +355,16 @@ public abstract class ScanQueryMatcher implements ShipperListener {
NavigableSet<byte[]> columns, ScanInfo scanInfo, long oldestUnexpiredTS, Scan userScan)
throws IOException {
int resultMaxVersion = scanInfo.getMaxVersions();
int maxVersionToCheck = resultMaxVersion;
if (userScan != null) {
if (userScan.isRaw()) {
resultMaxVersion = userScan.getMaxVersions();
} else {
resultMaxVersion = Math.min(userScan.getMaxVersions(), scanInfo.getMaxVersions());
}
maxVersionToCheck = userScan.hasFilter() ? scanInfo.getMaxVersions() : resultMaxVersion;
}
DeleteTracker deleteTracker;
if (scanInfo.isNewVersionBehavior() && (userScan == null || !userScan.isRaw())) {
deleteTracker = new NewVersionBehaviorTracker(columns, scanInfo.getMinVersions(),
@ -382,11 +385,11 @@ public abstract class ScanQueryMatcher implements ShipperListener {
if (deleteTracker instanceof NewVersionBehaviorTracker) {
columnTracker = (NewVersionBehaviorTracker) deleteTracker;
} else if (columns == null || columns.size() == 0) {
columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), resultMaxVersion,
columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersionToCheck,
oldestUnexpiredTS);
} else {
columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(),
resultMaxVersion, oldestUnexpiredTS);
maxVersionToCheck, oldestUnexpiredTS);
}
return new Pair<>(deleteTracker, columnTracker);
}

View File

@ -36,8 +36,9 @@ import org.apache.hadoop.hbase.util.Bytes;
public class ScanWildcardColumnTracker implements ColumnTracker {
private Cell columnCell = null;
private int currentCount = 0;
private int maxVersions;
private int minVersions;
private final int maxVersions;
private final int minVersions;
/*
* Keeps track of the latest timestamp and type included for current column. Used to eliminate
* duplicates.
@ -74,7 +75,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
@Override
public ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type,
boolean ignoreCount) throws IOException {
if (columnCell == null) {
// first iteration.
resetCell(cell);
@ -143,7 +143,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
} else {
return MatchCode.SEEK_NEXT_COL;
}
}
@Override

View File

@ -22,6 +22,7 @@ import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
@ -52,6 +53,12 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
protected final TimeRange tr;
private final int versionsAfterFilter;
private int count = 0;
private Cell curColCell = null;
private static Cell createStartKey(Scan scan, ScanInfo scanInfo) {
if (scan.includeStartRow()) {
return createStartKeyFromRow(scan.getStartRow(), scanInfo);
@ -65,6 +72,13 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
super(createStartKey(scan, scanInfo), scanInfo, columns, oldestUnexpiredTS, now);
this.hasNullColumn = hasNullColumn;
this.filter = scan.getFilter();
if (this.filter != null) {
this.versionsAfterFilter =
scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(),
scanInfo.getMaxVersions());
} else {
this.versionsAfterFilter = 0;
}
this.stopRow = scan.getStopRow();
TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily());
if (timeRange == null) {
@ -98,6 +112,14 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
}
}
@Override
public void beforeShipped() throws IOException {
super.beforeShipped();
if (curColCell != null) {
this.curColCell = KeyValueUtil.toNewKeyCell(this.curColCell);
}
}
protected final MatchCode matchColumn(Cell cell, long timestamp, byte typeByte)
throws IOException {
int tsCmp = tr.compare(timestamp);
@ -108,57 +130,111 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
return columns.getNextRowOrNextColumn(cell);
}
// STEP 1: Check if the column is part of the requested columns
MatchCode colChecker = columns.checkColumn(cell, typeByte);
if (colChecker != MatchCode.INCLUDE) {
return colChecker;
}
ReturnCode filterResponse = ReturnCode.SKIP;
// STEP 2: Yes, the column is part of the requested columns. Check if filter is present
if (filter != null) {
// STEP 3: Filter the key value and return if it filters out
filterResponse = filter.filterKeyValue(cell);
switch (filterResponse) {
case SKIP:
return MatchCode.SKIP;
case NEXT_COL:
return columns.getNextRowOrNextColumn(cell);
case NEXT_ROW:
return MatchCode.SEEK_NEXT_ROW;
case SEEK_NEXT_USING_HINT:
return MatchCode.SEEK_NEXT_USING_HINT;
default:
// It means it is either include or include and seek next
break;
}
MatchCode matchCode = columns.checkColumn(cell, typeByte);
if (matchCode != MatchCode.INCLUDE) {
return matchCode;
}
/*
* STEP 4: Reaching this step means the column is part of the requested columns and either
* the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.
* Now check the number of versions needed. This method call returns SKIP, INCLUDE,
* INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL.
*
* FilterResponse ColumnChecker Desired behavior
* INCLUDE SKIP row has already been included, SKIP.
* INCLUDE INCLUDE INCLUDE
* INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP.
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
*
* In all the above scenarios, we return the column checker return value except for
* FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
* STEP 2: check the number of versions needed. This method call returns SKIP, SEEK_NEXT_COL,
* INCLUDE, INCLUDE_AND_SEEK_NEXT_COL, or INCLUDE_AND_SEEK_NEXT_ROW.
*/
colChecker = columns.checkVersions(cell, timestamp, typeByte, false);
if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (colChecker != MatchCode.SKIP) {
return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
}
return MatchCode.SEEK_NEXT_ROW;
matchCode = columns.checkVersions(cell, timestamp, typeByte, false);
switch (matchCode) {
case SKIP:
return MatchCode.SKIP;
case SEEK_NEXT_COL:
return MatchCode.SEEK_NEXT_COL;
default:
// It means it is INCLUDE, INCLUDE_AND_SEEK_NEXT_COL or INCLUDE_AND_SEEK_NEXT_ROW.
assert matchCode == MatchCode.INCLUDE || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL
|| matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
break;
}
return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && colChecker == MatchCode.INCLUDE)
? MatchCode.INCLUDE_AND_SEEK_NEXT_COL : colChecker;
return filter == null ? matchCode : mergeFilterResponse(cell, matchCode,
filter.filterKeyValue(cell));
}
/*
* Call this when scan has filter. Decide the desired behavior by checkVersions's MatchCode
* and filterKeyValue's ReturnCode. Cell may be skipped by filter, so the column versions
* in result may be less than user need. It will check versions again after filter.
*
* ColumnChecker FilterResponse Desired behavior
* INCLUDE SKIP SKIP
* INCLUDE NEXT_COL SEEK_NEXT_COL or SEEK_NEXT_ROW
* INCLUDE NEXT_ROW SEEK_NEXT_ROW
* INCLUDE SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT
* INCLUDE INCLUDE INCLUDE
* INCLUDE INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_COL SKIP SEEK_NEXT_COL
* INCLUDE_AND_SEEK_NEXT_COL NEXT_COL SEEK_NEXT_COL or SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_COL NEXT_ROW SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_COL SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_ROW SKIP SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_ROW NEXT_COL SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_ROW NEXT_ROW SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_ROW SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT
* INCLUDE_AND_SEEK_NEXT_ROW INCLUDE INCLUDE_AND_SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
*/
private final MatchCode mergeFilterResponse(Cell cell, MatchCode matchCode,
ReturnCode filterResponse) {
switch (filterResponse) {
case SKIP:
if (matchCode == MatchCode.INCLUDE) {
return MatchCode.SKIP;
} else if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
return MatchCode.SEEK_NEXT_COL;
} else if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
return MatchCode.SEEK_NEXT_ROW;
}
break;
case NEXT_COL:
if (matchCode == MatchCode.INCLUDE || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
return columns.getNextRowOrNextColumn(cell);
} else if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
return MatchCode.SEEK_NEXT_ROW;
}
break;
case NEXT_ROW:
return MatchCode.SEEK_NEXT_ROW;
case SEEK_NEXT_USING_HINT:
return MatchCode.SEEK_NEXT_USING_HINT;
case INCLUDE:
break;
case INCLUDE_AND_NEXT_COL:
if (matchCode == MatchCode.INCLUDE) {
matchCode = MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
}
break;
case INCLUDE_AND_SEEK_NEXT_ROW:
break;
default:
throw new RuntimeException("UNEXPECTED");
}
// It means it is INCLUDE, INCLUDE_AND_SEEK_NEXT_COL or INCLUDE_AND_SEEK_NEXT_ROW.
assert matchCode == MatchCode.INCLUDE || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL
|| matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL
|| matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
return matchCode;
}
// Now we will check versions again.
if (curColCell == null || !CellUtil.matchingRowColumn(cell, curColCell)) {
count = 0;
curColCell = cell;
}
count += 1;
return count > versionsAfterFilter ? MatchCode.SEEK_NEXT_COL : MatchCode.INCLUDE;
}
protected abstract boolean isGet();

View File

@ -88,6 +88,8 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@ -6424,4 +6426,197 @@ public class TestFromClientSide {
}
}
@Test
public void testDeleteSpecifiedVersionOfSpecifiedColumn() throws Exception {
Admin admin = TEST_UTIL.getAdmin();
final TableName tableName = TableName.valueOf(name.getMethodName());
byte[][] VALUES = makeN(VALUE, 5);
long[] ts = { 1000, 2000, 3000, 4000, 5000 };
Table ht = TEST_UTIL.createTable(tableName, FAMILY, 5);
Put put = new Put(ROW);
// Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER
for (int t = 0; t < 4; t++) {
put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]);
}
ht.put(put);
Delete delete = new Delete(ROW);
// Delete version 3000 of column FAMILY:QUALIFIER
delete.addColumn(FAMILY, QUALIFIER, ts[2]);
ht.delete(delete);
Get get = new Get(ROW);
get.addColumn(FAMILY, QUALIFIER);
get.setMaxVersions(Integer.MAX_VALUE);
Result result = ht.get(get);
// verify version 1000,2000,4000 remains for column FAMILY:QUALIFIER
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[3] }, new byte[][] {
VALUES[0], VALUES[1], VALUES[3] }, 0, 2);
delete = new Delete(ROW);
// Delete a version 5000 of column FAMILY:QUALIFIER which didn't exist
delete.addColumn(FAMILY, QUALIFIER, ts[4]);
ht.delete(delete);
get = new Get(ROW);
get.addColumn(FAMILY, QUALIFIER);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
// verify version 1000,2000,4000 remains for column FAMILY:QUALIFIER
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[3] }, new byte[][] {
VALUES[0], VALUES[1], VALUES[3] }, 0, 2);
ht.close();
admin.close();
}
@Test
public void testDeleteLatestVersionOfSpecifiedColumn() throws Exception {
Admin admin = TEST_UTIL.getAdmin();
final TableName tableName = TableName.valueOf(name.getMethodName());
byte[][] VALUES = makeN(VALUE, 5);
long[] ts = { 1000, 2000, 3000, 4000, 5000 };
Table ht = TEST_UTIL.createTable(tableName, FAMILY, 5);
Put put = new Put(ROW);
// Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER
for (int t = 0; t < 4; t++) {
put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]);
}
ht.put(put);
Delete delete = new Delete(ROW);
// Delete latest version of column FAMILY:QUALIFIER
delete.addColumn(FAMILY, QUALIFIER);
ht.delete(delete);
Get get = new Get(ROW);
get.addColumn(FAMILY, QUALIFIER);
get.setMaxVersions(Integer.MAX_VALUE);
Result result = ht.get(get);
// verify version 1000,2000,3000 remains for column FAMILY:QUALIFIER
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[2] }, new byte[][] {
VALUES[0], VALUES[1], VALUES[2] }, 0, 2);
delete = new Delete(ROW);
// Delete two latest version of column FAMILY:QUALIFIER
delete.addColumn(FAMILY, QUALIFIER);
delete.addColumn(FAMILY, QUALIFIER);
ht.delete(delete);
get = new Get(ROW);
get.addColumn(FAMILY, QUALIFIER);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
// verify version 1000 remains for column FAMILY:QUALIFIER
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0] }, new byte[][] { VALUES[0] },
0, 0);
put = new Put(ROW);
// Put a version 5000 of column FAMILY:QUALIFIER
put.addColumn(FAMILY, QUALIFIER, ts[4], VALUES[4]);
ht.put(put);
get = new Get(ROW);
get.addColumn(FAMILY, QUALIFIER);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
// verify version 1000,5000 remains for column FAMILY:QUALIFIER
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[4] }, new byte[][] {
VALUES[0], VALUES[4] }, 0, 1);
ht.close();
admin.close();
}
/**
* Test for HBASE-17125
*/
@Test
public void testReadWithFilter() throws Exception {
Admin admin = TEST_UTIL.getAdmin();
final TableName tableName = TableName.valueOf(name.getMethodName());
Table table = TEST_UTIL.createTable(tableName, FAMILY, 3);
byte[] VALUEA = Bytes.toBytes("value-a");
byte[] VALUEB = Bytes.toBytes("value-b");
long[] ts = { 1000, 2000, 3000, 4000 };
Put put = new Put(ROW);
// Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER
for (int t = 0; t <= 3; t++) {
if (t <= 1) {
put.addColumn(FAMILY, QUALIFIER, ts[t], VALUEA);
} else {
put.addColumn(FAMILY, QUALIFIER, ts[t], VALUEB);
}
}
table.put(put);
Scan scan =
new Scan().setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
.setMaxVersions(3);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
// ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
0);
Get get =
new Get(ROW)
.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
.setMaxVersions(3);
result = table.get(get);
// ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
0);
// Test with max versions 1, it should still read ts[1]
scan =
new Scan().setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
.setMaxVersions(1);
scanner = table.getScanner(scan);
result = scanner.next();
// ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
0);
// Test with max versions 1, it should still read ts[1]
get =
new Get(ROW)
.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
.setMaxVersions(1);
result = table.get(get);
// ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
0);
// Test with max versions 5, it should still read ts[1]
scan =
new Scan().setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
.setMaxVersions(5);
scanner = table.getScanner(scan);
result = scanner.next();
// ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
0);
// Test with max versions 5, it should still read ts[1]
get =
new Get(ROW)
.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
.setMaxVersions(5);
result = table.get(get);
// ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
0);
table.close();
admin.close();
}
}

View File

@ -123,6 +123,8 @@ import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -2638,6 +2640,68 @@ public class TestHRegion {
}
}
@Test
public void testGetWithFilter() throws IOException, InterruptedException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
byte[] col1 = Bytes.toBytes("col1");
byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2");
final int maxVersions = 3;
HColumnDescriptor hcd = new HColumnDescriptor(fam1);
hcd.setMaxVersions(maxVersions);
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
htd.addFamily(hcd);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
try {
// Put 4 version to memstore
long ts = 0;
Put put = new Put(row1, ts);
put.addColumn(fam1, col1, value1);
region.put(put);
put = new Put(row1, ts + 1);
put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
region.put(put);
put = new Put(row1, ts + 2);
put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
region.put(put);
put = new Put(row1, ts + 3);
put.addColumn(fam1, col1, value2);
region.put(put);
Get get = new Get(row1);
get.setMaxVersions();
Result res = region.get(get);
// Get 3 versions, the oldest version has gone from user view
assertEquals(maxVersions, res.size());
get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value")));
res = region.get(get);
// When use value filter, the oldest version should still gone from user view and it
// should only return one key vaule
assertEquals(1, res.size());
assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
region.flush(true);
region.compact(true);
Thread.sleep(1000);
res = region.get(get);
// After flush and compact, the result should be consistent with previous result
assertEquals(1, res.size());
assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
} finally {
HBaseTestingUtility.closeRegionAndWAL(this.region);
this.region = null;
}
}
// ////////////////////////////////////////////////////////////////////////////
// Scanner tests
// ////////////////////////////////////////////////////////////////////////////

View File

@ -431,24 +431,27 @@ public class TestMinVersions {
tss.add(ts-1);
tss.add(ts-2);
// Sholud only get T2, versions is 2, so T1 is gone from user view.
Get g = new Get(T1);
g.addColumn(c1,c1);
g.setFilter(new TimestampsFilter(tss));
g.setMaxVersions();
Result r = region.get(g);
checkResult(r, c1, T2,T1);
checkResult(r, c1, T2);
// Sholud only get T2, versions is 2, so T1 is gone from user view.
g = new Get(T1);
g.addColumn(c0,c0);
g.setFilter(new TimestampsFilter(tss));
g.setMaxVersions();
r = region.get(g);
checkResult(r, c0, T2,T1);
checkResult(r, c0, T2);
// now flush/compact
region.flush(true);
region.compact(true);
// After flush/compact, the result should be consistent with previous result
g = new Get(T1);
g.addColumn(c1,c1);
g.setFilter(new TimestampsFilter(tss));
@ -456,6 +459,7 @@ public class TestMinVersions {
r = region.get(g);
checkResult(r, c1, T2);
// After flush/compact, the result should be consistent with previous result
g = new Get(T1);
g.addColumn(c0,c0);
g.setFilter(new TimestampsFilter(tss));

View File

@ -1061,10 +1061,11 @@ public class TestStore {
@Test
public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
final int expectedSize = 3;
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == 2) {
if (currentSize == expectedSize - 1) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
@ -1078,16 +1079,17 @@ public class TestStore {
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
return ReturnCode.INCLUDE;
}
});
}, expectedSize);
}
@Test
public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
final int expectedSize = 2;
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == 2) {
if (currentSize == expectedSize - 1) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
@ -1106,16 +1108,17 @@ public class TestStore {
return ReturnCode.INCLUDE;
}
}
});
}, expectedSize);
}
@Test
public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException {
final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
final int expectedSize = 2;
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
if (currentSize == 2) {
if (currentSize == expectedSize - 1) {
try {
flushStore(store, id++);
timeToGetHint.set(true);
@ -1138,10 +1141,10 @@ public class TestStore {
public Cell getNextCellHint(Cell currentCell) throws IOException {
return currentCell;
}
});
}, expectedSize);
}
private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter)
private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
HColumnDescriptor hcd = new HColumnDescriptor(family);
@ -1182,7 +1185,7 @@ public class TestStore {
scan, null, seqId + 3)){
// r1
scanner.next(myList);
assertEquals(3, myList.size());
assertEquals(expectedSize, myList.size());
for (Cell c : myList) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value1)