From 8197a31bbc4c49b4edfc2a0f01b3ef29b40e268d Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Thu, 10 Aug 2017 21:03:50 +0800 Subject: [PATCH] HBASE-17125 Inconsistent result when use filter to read data --- .../org/apache/hadoop/hbase/client/Get.java | 34 ++- .../org/apache/hadoop/hbase/client/Query.java | 6 +- .../org/apache/hadoop/hbase/client/Scan.java | 29 ++- .../querymatcher/ScanQueryMatcher.java | 7 +- .../ScanWildcardColumnTracker.java | 7 +- .../querymatcher/UserScanQueryMatcher.java | 170 ++++++++++----- .../hbase/client/TestFromClientSide.java | 195 ++++++++++++++++++ .../hbase/regionserver/TestHRegion.java | 64 ++++++ .../hbase/regionserver/TestMinVersions.java | 8 +- .../hadoop/hbase/regionserver/TestStore.java | 19 +- 10 files changed, 465 insertions(+), 74 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index b774a9a9396..086a0b449f6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -267,10 +267,12 @@ public class Get extends Query /** * Get all available versions. * @return this for invocation chaining + * @deprecated It is easy to misunderstand with column family's max versions, so use + * {@link #readAllVersions()} instead. */ + @Deprecated public Get setMaxVersions() { - this.maxVersions = Integer.MAX_VALUE; - return this; + return readAllVersions(); } /** @@ -278,12 +280,34 @@ public class Get extends Query * @param maxVersions maximum versions for each column * @throws IOException if invalid number of versions * @return this for invocation chaining + * @deprecated It is easy to misunderstand with column family's max versions, so use + * {@link #readVersions(int)} instead. */ + @Deprecated public Get setMaxVersions(int maxVersions) throws IOException { - if(maxVersions <= 0) { - throw new IOException("maxVersions must be positive"); + return readVersions(maxVersions); + } + + /** + * Get all available versions. + * @return this for invocation chaining + */ + public Get readAllVersions() { + this.maxVersions = Integer.MAX_VALUE; + return this; + } + + /** + * Get up to the specified number of versions of each column. + * @param versions specified number of versions for each column + * @throws IOException if invalid number of versions + * @return this for invocation chaining + */ + public Get readVersions(int versions) throws IOException { + if (versions <= 0) { + throw new IOException("versions must be positive"); } - this.maxVersions = maxVersions; + this.maxVersions = versions; return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java index 0bf54ae092d..cc9e9d4aecb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java @@ -53,9 +53,9 @@ public abstract class Query extends OperationWithAttributes { } /** - * Apply the specified server-side filter when performing the Query. - * Only {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)} is called AFTER all tests - * for ttl, column match, deletes and max versions have been run. + * Apply the specified server-side filter when performing the Query. Only + * {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)} is called AFTER all tests for ttl, + * column match, deletes and column family's max versions have been run. * @param filter filter to run on the server * @return this for invocation chaining */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index e84716f5103..5b75151bbda 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -593,19 +593,42 @@ public class Scan extends Query { /** * Get all available versions. * @return this + * @deprecated It is easy to misunderstand with column family's max versions, so use + * {@link #readAllVersions()} instead. */ + @Deprecated public Scan setMaxVersions() { - this.maxVersions = Integer.MAX_VALUE; - return this; + return readAllVersions(); } /** * Get up to the specified number of versions of each column. * @param maxVersions maximum versions for each column * @return this + * @deprecated It is easy to misunderstand with column family's max versions, so use + * {@link #readVersions(int)} instead. */ + @Deprecated public Scan setMaxVersions(int maxVersions) { - this.maxVersions = maxVersions; + return readVersions(maxVersions); + } + + /** + * Get all available versions. + * @return this + */ + public Scan readAllVersions() { + this.maxVersions = Integer.MAX_VALUE; + return this; + } + + /** + * Get up to the specified number of versions of each column. + * @param versions specified number of versions for each column + * @return this + */ + public Scan readVersions(int versions) { + this.maxVersions = versions; return this; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java index 8bdab086e6b..524d3f744f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java @@ -355,13 +355,16 @@ public abstract class ScanQueryMatcher implements ShipperListener { NavigableSet columns, ScanInfo scanInfo, long oldestUnexpiredTS, Scan userScan) throws IOException { int resultMaxVersion = scanInfo.getMaxVersions(); + int maxVersionToCheck = resultMaxVersion; if (userScan != null) { if (userScan.isRaw()) { resultMaxVersion = userScan.getMaxVersions(); } else { resultMaxVersion = Math.min(userScan.getMaxVersions(), scanInfo.getMaxVersions()); } + maxVersionToCheck = userScan.hasFilter() ? scanInfo.getMaxVersions() : resultMaxVersion; } + DeleteTracker deleteTracker; if (scanInfo.isNewVersionBehavior() && (userScan == null || !userScan.isRaw())) { deleteTracker = new NewVersionBehaviorTracker(columns, scanInfo.getMinVersions(), @@ -382,11 +385,11 @@ public abstract class ScanQueryMatcher implements ShipperListener { if (deleteTracker instanceof NewVersionBehaviorTracker) { columnTracker = (NewVersionBehaviorTracker) deleteTracker; } else if (columns == null || columns.size() == 0) { - columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), resultMaxVersion, + columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersionToCheck, oldestUnexpiredTS); } else { columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), - resultMaxVersion, oldestUnexpiredTS); + maxVersionToCheck, oldestUnexpiredTS); } return new Pair<>(deleteTracker, columnTracker); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java index a73cc0b8be4..9f0a4614bda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java @@ -36,8 +36,9 @@ import org.apache.hadoop.hbase.util.Bytes; public class ScanWildcardColumnTracker implements ColumnTracker { private Cell columnCell = null; private int currentCount = 0; - private int maxVersions; - private int minVersions; + private final int maxVersions; + private final int minVersions; + /* * Keeps track of the latest timestamp and type included for current column. Used to eliminate * duplicates. @@ -74,7 +75,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker { @Override public ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type, boolean ignoreCount) throws IOException { - if (columnCell == null) { // first iteration. resetCell(cell); @@ -143,7 +143,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker { } else { return MatchCode.SEEK_NEXT_COL; } - } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java index 250a4a3ee42..1debb5e4a0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java @@ -22,6 +22,7 @@ import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; @@ -52,6 +53,12 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { protected final TimeRange tr; + private final int versionsAfterFilter; + + private int count = 0; + + private Cell curColCell = null; + private static Cell createStartKey(Scan scan, ScanInfo scanInfo) { if (scan.includeStartRow()) { return createStartKeyFromRow(scan.getStartRow(), scanInfo); @@ -65,6 +72,13 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { super(createStartKey(scan, scanInfo), scanInfo, columns, oldestUnexpiredTS, now); this.hasNullColumn = hasNullColumn; this.filter = scan.getFilter(); + if (this.filter != null) { + this.versionsAfterFilter = + scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(), + scanInfo.getMaxVersions()); + } else { + this.versionsAfterFilter = 0; + } this.stopRow = scan.getStopRow(); TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily()); if (timeRange == null) { @@ -98,6 +112,14 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { } } + @Override + public void beforeShipped() throws IOException { + super.beforeShipped(); + if (curColCell != null) { + this.curColCell = KeyValueUtil.toNewKeyCell(this.curColCell); + } + } + protected final MatchCode matchColumn(Cell cell, long timestamp, byte typeByte) throws IOException { int tsCmp = tr.compare(timestamp); @@ -108,57 +130,111 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher { return columns.getNextRowOrNextColumn(cell); } // STEP 1: Check if the column is part of the requested columns - MatchCode colChecker = columns.checkColumn(cell, typeByte); - if (colChecker != MatchCode.INCLUDE) { - return colChecker; - } - ReturnCode filterResponse = ReturnCode.SKIP; - // STEP 2: Yes, the column is part of the requested columns. Check if filter is present - if (filter != null) { - // STEP 3: Filter the key value and return if it filters out - filterResponse = filter.filterKeyValue(cell); - switch (filterResponse) { - case SKIP: - return MatchCode.SKIP; - case NEXT_COL: - return columns.getNextRowOrNextColumn(cell); - case NEXT_ROW: - return MatchCode.SEEK_NEXT_ROW; - case SEEK_NEXT_USING_HINT: - return MatchCode.SEEK_NEXT_USING_HINT; - default: - // It means it is either include or include and seek next - break; - } + MatchCode matchCode = columns.checkColumn(cell, typeByte); + if (matchCode != MatchCode.INCLUDE) { + return matchCode; } /* - * STEP 4: Reaching this step means the column is part of the requested columns and either - * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response. - * Now check the number of versions needed. This method call returns SKIP, INCLUDE, - * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL. - * - * FilterResponse ColumnChecker Desired behavior - * INCLUDE SKIP row has already been included, SKIP. - * INCLUDE INCLUDE INCLUDE - * INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP. - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * - * In all the above scenarios, we return the column checker return value except for - * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE) + * STEP 2: check the number of versions needed. This method call returns SKIP, SEEK_NEXT_COL, + * INCLUDE, INCLUDE_AND_SEEK_NEXT_COL, or INCLUDE_AND_SEEK_NEXT_ROW. */ - colChecker = columns.checkVersions(cell, timestamp, typeByte, false); - if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) { - if (colChecker != MatchCode.SKIP) { - return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; - } - return MatchCode.SEEK_NEXT_ROW; + matchCode = columns.checkVersions(cell, timestamp, typeByte, false); + switch (matchCode) { + case SKIP: + return MatchCode.SKIP; + case SEEK_NEXT_COL: + return MatchCode.SEEK_NEXT_COL; + default: + // It means it is INCLUDE, INCLUDE_AND_SEEK_NEXT_COL or INCLUDE_AND_SEEK_NEXT_ROW. + assert matchCode == MatchCode.INCLUDE || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL + || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; + break; } - return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && colChecker == MatchCode.INCLUDE) - ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL : colChecker; + + return filter == null ? matchCode : mergeFilterResponse(cell, matchCode, + filter.filterKeyValue(cell)); + } + + /* + * Call this when scan has filter. Decide the desired behavior by checkVersions's MatchCode + * and filterKeyValue's ReturnCode. Cell may be skipped by filter, so the column versions + * in result may be less than user need. It will check versions again after filter. + * + * ColumnChecker FilterResponse Desired behavior + * INCLUDE SKIP SKIP + * INCLUDE NEXT_COL SEEK_NEXT_COL or SEEK_NEXT_ROW + * INCLUDE NEXT_ROW SEEK_NEXT_ROW + * INCLUDE SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT + * INCLUDE INCLUDE INCLUDE + * INCLUDE INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL + * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_COL SKIP SEEK_NEXT_COL + * INCLUDE_AND_SEEK_NEXT_COL NEXT_COL SEEK_NEXT_COL or SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_COL NEXT_ROW SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_COL SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT + * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL + * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL + * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW SKIP SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW NEXT_COL SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW NEXT_ROW SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT + * INCLUDE_AND_SEEK_NEXT_ROW INCLUDE INCLUDE_AND_SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW + * INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW + */ + private final MatchCode mergeFilterResponse(Cell cell, MatchCode matchCode, + ReturnCode filterResponse) { + switch (filterResponse) { + case SKIP: + if (matchCode == MatchCode.INCLUDE) { + return MatchCode.SKIP; + } else if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + return MatchCode.SEEK_NEXT_COL; + } else if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + return MatchCode.SEEK_NEXT_ROW; + } + break; + case NEXT_COL: + if (matchCode == MatchCode.INCLUDE || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + return columns.getNextRowOrNextColumn(cell); + } else if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + return MatchCode.SEEK_NEXT_ROW; + } + break; + case NEXT_ROW: + return MatchCode.SEEK_NEXT_ROW; + case SEEK_NEXT_USING_HINT: + return MatchCode.SEEK_NEXT_USING_HINT; + case INCLUDE: + break; + case INCLUDE_AND_NEXT_COL: + if (matchCode == MatchCode.INCLUDE) { + matchCode = MatchCode.INCLUDE_AND_SEEK_NEXT_COL; + } + break; + case INCLUDE_AND_SEEK_NEXT_ROW: + break; + default: + throw new RuntimeException("UNEXPECTED"); + } + + // It means it is INCLUDE, INCLUDE_AND_SEEK_NEXT_COL or INCLUDE_AND_SEEK_NEXT_ROW. + assert matchCode == MatchCode.INCLUDE || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL + || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; + + if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL + || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + return matchCode; + } + + // Now we will check versions again. + if (curColCell == null || !CellUtil.matchingRowColumn(cell, curColCell)) { + count = 0; + curColCell = cell; + } + count += 1; + return count > versionsAfterFilter ? MatchCode.SEEK_NEXT_COL : MatchCode.INCLUDE; } protected abstract boolean isGet(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index a93fbb20ec9..8a3841e38d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -88,6 +88,8 @@ import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.SubstringComparator; +import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.BlockCache; @@ -6424,4 +6426,197 @@ public class TestFromClientSide { } } + @Test + public void testDeleteSpecifiedVersionOfSpecifiedColumn() throws Exception { + Admin admin = TEST_UTIL.getAdmin(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + + byte[][] VALUES = makeN(VALUE, 5); + long[] ts = { 1000, 2000, 3000, 4000, 5000 }; + + Table ht = TEST_UTIL.createTable(tableName, FAMILY, 5); + + Put put = new Put(ROW); + // Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER + for (int t = 0; t < 4; t++) { + put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]); + } + ht.put(put); + + Delete delete = new Delete(ROW); + // Delete version 3000 of column FAMILY:QUALIFIER + delete.addColumn(FAMILY, QUALIFIER, ts[2]); + ht.delete(delete); + + Get get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + // verify version 1000,2000,4000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[3] }, new byte[][] { + VALUES[0], VALUES[1], VALUES[3] }, 0, 2); + + delete = new Delete(ROW); + // Delete a version 5000 of column FAMILY:QUALIFIER which didn't exist + delete.addColumn(FAMILY, QUALIFIER, ts[4]); + ht.delete(delete); + + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + // verify version 1000,2000,4000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[3] }, new byte[][] { + VALUES[0], VALUES[1], VALUES[3] }, 0, 2); + + ht.close(); + admin.close(); + } + + @Test + public void testDeleteLatestVersionOfSpecifiedColumn() throws Exception { + Admin admin = TEST_UTIL.getAdmin(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + + byte[][] VALUES = makeN(VALUE, 5); + long[] ts = { 1000, 2000, 3000, 4000, 5000 }; + + Table ht = TEST_UTIL.createTable(tableName, FAMILY, 5); + + Put put = new Put(ROW); + // Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER + for (int t = 0; t < 4; t++) { + put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]); + } + ht.put(put); + + Delete delete = new Delete(ROW); + // Delete latest version of column FAMILY:QUALIFIER + delete.addColumn(FAMILY, QUALIFIER); + ht.delete(delete); + + Get get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + // verify version 1000,2000,3000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[2] }, new byte[][] { + VALUES[0], VALUES[1], VALUES[2] }, 0, 2); + + delete = new Delete(ROW); + // Delete two latest version of column FAMILY:QUALIFIER + delete.addColumn(FAMILY, QUALIFIER); + delete.addColumn(FAMILY, QUALIFIER); + ht.delete(delete); + + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + // verify version 1000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0] }, new byte[][] { VALUES[0] }, + 0, 0); + + put = new Put(ROW); + // Put a version 5000 of column FAMILY:QUALIFIER + put.addColumn(FAMILY, QUALIFIER, ts[4], VALUES[4]); + ht.put(put); + + get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(Integer.MAX_VALUE); + result = ht.get(get); + // verify version 1000,5000 remains for column FAMILY:QUALIFIER + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[4] }, new byte[][] { + VALUES[0], VALUES[4] }, 0, 1); + + ht.close(); + admin.close(); + } + + /** + * Test for HBASE-17125 + */ + @Test + public void testReadWithFilter() throws Exception { + Admin admin = TEST_UTIL.getAdmin(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + Table table = TEST_UTIL.createTable(tableName, FAMILY, 3); + + byte[] VALUEA = Bytes.toBytes("value-a"); + byte[] VALUEB = Bytes.toBytes("value-b"); + long[] ts = { 1000, 2000, 3000, 4000 }; + + Put put = new Put(ROW); + // Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER + for (int t = 0; t <= 3; t++) { + if (t <= 1) { + put.addColumn(FAMILY, QUALIFIER, ts[t], VALUEA); + } else { + put.addColumn(FAMILY, QUALIFIER, ts[t], VALUEB); + } + } + table.put(put); + + Scan scan = + new Scan().setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a"))) + .setMaxVersions(3); + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3 + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0, + 0); + + Get get = + new Get(ROW) + .setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a"))) + .setMaxVersions(3); + result = table.get(get); + // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3 + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0, + 0); + + // Test with max versions 1, it should still read ts[1] + scan = + new Scan().setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a"))) + .setMaxVersions(1); + scanner = table.getScanner(scan); + result = scanner.next(); + // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3 + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0, + 0); + + // Test with max versions 1, it should still read ts[1] + get = + new Get(ROW) + .setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a"))) + .setMaxVersions(1); + result = table.get(get); + // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3 + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0, + 0); + + // Test with max versions 5, it should still read ts[1] + scan = + new Scan().setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a"))) + .setMaxVersions(5); + scanner = table.getScanner(scan); + result = scanner.next(); + // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3 + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0, + 0); + + // Test with max versions 5, it should still read ts[1] + get = + new Get(ROW) + .setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a"))) + .setMaxVersions(5); + result = table.get(get); + // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3 + assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0, + 0); + + table.close(); + admin.close(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 9db7c160f22..b8020d876a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -123,6 +123,8 @@ import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.SubstringComparator; +import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -2638,6 +2640,68 @@ public class TestHRegion { } } + @Test + public void testGetWithFilter() throws IOException, InterruptedException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] fam1 = Bytes.toBytes("fam1"); + byte[] col1 = Bytes.toBytes("col1"); + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + + final int maxVersions = 3; + HColumnDescriptor hcd = new HColumnDescriptor(fam1); + hcd.setMaxVersions(maxVersions); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker")); + htd.addFamily(hcd); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); + Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); + final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info); + this.region = TEST_UTIL.createLocalHRegion(info, htd, wal); + + try { + // Put 4 version to memstore + long ts = 0; + Put put = new Put(row1, ts); + put.addColumn(fam1, col1, value1); + region.put(put); + put = new Put(row1, ts + 1); + put.addColumn(fam1, col1, Bytes.toBytes("filter1")); + region.put(put); + put = new Put(row1, ts + 2); + put.addColumn(fam1, col1, Bytes.toBytes("filter2")); + region.put(put); + put = new Put(row1, ts + 3); + put.addColumn(fam1, col1, value2); + region.put(put); + + Get get = new Get(row1); + get.setMaxVersions(); + Result res = region.get(get); + // Get 3 versions, the oldest version has gone from user view + assertEquals(maxVersions, res.size()); + + get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value"))); + res = region.get(get); + // When use value filter, the oldest version should still gone from user view and it + // should only return one key vaule + assertEquals(1, res.size()); + assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); + assertEquals(ts + 3, res.rawCells()[0].getTimestamp()); + + region.flush(true); + region.compact(true); + Thread.sleep(1000); + res = region.get(get); + // After flush and compact, the result should be consistent with previous result + assertEquals(1, res.size()); + assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + } + } + // //////////////////////////////////////////////////////////////////////////// // Scanner tests // //////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java index 52b5a409862..e8d60e67c7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java @@ -431,24 +431,27 @@ public class TestMinVersions { tss.add(ts-1); tss.add(ts-2); + // Sholud only get T2, versions is 2, so T1 is gone from user view. Get g = new Get(T1); g.addColumn(c1,c1); g.setFilter(new TimestampsFilter(tss)); g.setMaxVersions(); Result r = region.get(g); - checkResult(r, c1, T2,T1); + checkResult(r, c1, T2); + // Sholud only get T2, versions is 2, so T1 is gone from user view. g = new Get(T1); g.addColumn(c0,c0); g.setFilter(new TimestampsFilter(tss)); g.setMaxVersions(); r = region.get(g); - checkResult(r, c0, T2,T1); + checkResult(r, c0, T2); // now flush/compact region.flush(true); region.compact(true); + // After flush/compact, the result should be consistent with previous result g = new Get(T1); g.addColumn(c1,c1); g.setFilter(new TimestampsFilter(tss)); @@ -456,6 +459,7 @@ public class TestMinVersions { r = region.get(g); checkResult(r, c1, T2); + // After flush/compact, the result should be consistent with previous result g = new Get(T1); g.addColumn(c0,c0); g.setFilter(new TimestampsFilter(tss)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 49ebcad0d2c..3a2704b3c70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -1061,10 +1061,11 @@ public class TestStore { @Test public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); + final int expectedSize = 3; testFlushBeforeCompletingScan(new MyListHook() { @Override public void hook(int currentSize) { - if (currentSize == 2) { + if (currentSize == expectedSize - 1) { try { flushStore(store, id++); timeToGoNextRow.set(true); @@ -1078,16 +1079,17 @@ public class TestStore { public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { return ReturnCode.INCLUDE; } - }); + }, expectedSize); } @Test public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); + final int expectedSize = 2; testFlushBeforeCompletingScan(new MyListHook() { @Override public void hook(int currentSize) { - if (currentSize == 2) { + if (currentSize == expectedSize - 1) { try { flushStore(store, id++); timeToGoNextRow.set(true); @@ -1106,16 +1108,17 @@ public class TestStore { return ReturnCode.INCLUDE; } } - }); + }, expectedSize); } @Test public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException { final AtomicBoolean timeToGetHint = new AtomicBoolean(false); + final int expectedSize = 2; testFlushBeforeCompletingScan(new MyListHook() { @Override public void hook(int currentSize) { - if (currentSize == 2) { + if (currentSize == expectedSize - 1) { try { flushStore(store, id++); timeToGetHint.set(true); @@ -1138,10 +1141,10 @@ public class TestStore { public Cell getNextCellHint(Cell currentCell) throws IOException { return currentCell; } - }); + }, expectedSize); } - private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter) + private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); HColumnDescriptor hcd = new HColumnDescriptor(family); @@ -1182,7 +1185,7 @@ public class TestStore { scan, null, seqId + 3)){ // r1 scanner.next(myList); - assertEquals(3, myList.size()); + assertEquals(expectedSize, myList.size()); for (Cell c : myList) { byte[] actualValue = CellUtil.cloneValue(c); assertTrue("expected:" + Bytes.toStringBinary(value1)