From dc56aa2d4f26ba515ad8b830b83370649648e518 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 31 Jul 2016 19:55:41 +0800 Subject: [PATCH] HBASE-16225 Refactor ScanQueryMatcher --- .../org/apache/hadoop/hbase/io/TimeRange.java | 9 +- .../hbase/coprocessor/BaseRegionObserver.java | 2 +- .../hbase/coprocessor/RegionObserver.java | 2 +- .../hadoop/hbase/regionserver/HStore.java | 3 +- .../regionserver/RegionCoprocessorHost.java | 1 + .../hadoop/hbase/regionserver/ScanInfo.java | 6 +- .../hbase/regionserver/ScanQueryMatcher.java | 706 ------------------ .../hadoop/hbase/regionserver/Store.java | 1 + .../hadoop/hbase/regionserver/StoreFile.java | 35 +- .../hbase/regionserver/StoreFileScanner.java | 63 +- .../hbase/regionserver/StoreScanner.java | 91 ++- .../{ => querymatcher}/ColumnCount.java | 4 +- .../{ => querymatcher}/ColumnTracker.java | 8 +- .../CompactionScanQueryMatcher.java | 119 +++ .../{ => querymatcher}/DeleteTracker.java | 31 +- ...DropDeletesCompactionScanQueryMatcher.java | 84 +++ .../ExplicitColumnTracker.java | 105 ++- .../querymatcher/LegacyScanQueryMatcher.java | 399 ++++++++++ .../MajorCompactionScanQueryMatcher.java | 81 ++ .../MinorCompactionScanQueryMatcher.java | 62 ++ .../NormalUserScanQueryMatcher.java | 106 +++ .../querymatcher/RawScanQueryMatcher.java | 77 ++ .../{ => querymatcher}/ScanDeleteTracker.java | 49 +- .../querymatcher/ScanQueryMatcher.java | 328 ++++++++ .../ScanWildcardColumnTracker.java | 74 +- .../StripeCompactionScanQueryMatcher.java | 119 +++ .../querymatcher/UserScanQueryMatcher.java | 213 ++++++ .../visibility/VisibilityController.java | 2 +- .../VisibilityScanDeleteTracker.java | 2 +- .../regionserver/DataBlockEncodingTool.java | 2 +- .../EncodedSeekPerformanceTest.java | 4 +- .../regionserver/TestCompoundBloomFilter.java | 2 +- .../hbase/regionserver/TestQueryMatcher.java | 379 ---------- .../regionserver/TestScanDeleteTracker.java | 186 ----- .../hbase/regionserver/TestStoreFile.java | 23 +- ...estStoreFileScannerWithTagCompression.java | 2 +- .../hbase/regionserver/TestStoreScanner.java | 6 +- .../compactions/TestCompactor.java | 4 +- .../TestStripeCompactionPolicy.java | 5 +- .../AbstractTestScanQueryMatcher.java | 78 ++ .../TestCompactionScanQueryMatcher.java | 99 +++ .../TestExplicitColumnTracker.java | 84 +-- .../querymatcher/TestScanDeleteTracker.java | 185 +++++ .../TestScanWildcardColumnTracker.java | 76 +- .../TestUserScanQueryMatcher.java | 236 ++++++ 45 files changed, 2503 insertions(+), 1650 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{ => querymatcher}/ColumnCount.java (97%) rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{ => querymatcher}/ColumnTracker.java (95%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{ => querymatcher}/DeleteTracker.java (81%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{ => querymatcher}/ExplicitColumnTracker.java (75%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{ => querymatcher}/ScanDeleteTracker.java (84%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{ => querymatcher}/ScanWildcardColumnTracker.java (75%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanDeleteTracker.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/AbstractTestScanQueryMatcher.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/{ => querymatcher}/TestExplicitColumnTracker.java (70%) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestScanDeleteTracker.java rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/{ => querymatcher}/TestScanWildcardColumnTracker.java (63%) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java index 8f2369477dd..d325f7cd305 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java @@ -187,12 +187,11 @@ public class TimeRange { } /** - * Check if the specified timestamp is within this TimeRange. + * Check if the specified timestamp is within or after this TimeRange. *

- * Returns true if within interval [minStamp, maxStamp), false - * if not. + * Returns true if greater than minStamp, false if not. * @param timestamp timestamp to check - * @return true if within TimeRange, false if not + * @return true if within or after TimeRange, false if not */ public boolean withinOrAfterTimeRange(long timestamp) { assert timestamp >= 0; @@ -229,4 +228,4 @@ public class TimeRange { sb.append(this.minStamp); return sb.toString(); } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 1bf7449195c..eb2fc28f5cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.regionserver.DeleteTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 8c5c15ab19a..42d5cdb4fa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.regionserver.DeleteTracker; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -56,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 66eaebc6632..b598f09f4e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; @@ -1863,7 +1864,7 @@ public class HStore implements Store { * @param oldestTimestamp * @return true if the cell is expired */ - static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) { + public static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) { // Do not create an Iterator or Tag objects unless the cell actually has tags. // TODO: This check for tags is really expensive. We decode an int for key and value. Costs. if (cell.getTagsLength() > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 814370cdf2b..28d21290270 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java index d00b08784f6..804bfbc33e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java @@ -121,11 +121,11 @@ public class ScanInfo { return this.parallelSeekEnabled; } - byte[] getFamily() { + public byte[] getFamily() { return family; } - int getMinVersions() { + public int getMinVersions() { return minVersions; } @@ -137,7 +137,7 @@ public class ScanInfo { return ttl; } - KeepDeletedCells getKeepDeletedCells() { + public KeepDeletedCells getKeepDeletedCells() { return keepDeletedCells; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java deleted file mode 100644 index 7dfe7c2d6b7..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ /dev/null @@ -1,706 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.NavigableSet; - -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeepDeletedCells; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.Filter.ReturnCode; -import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -import com.google.common.base.Preconditions; - -/** - * A query matcher that is specifically designed for the scan case. - */ -@InterfaceAudience.Private -public class ScanQueryMatcher { - // Optimization so we can skip lots of compares when we decide to skip - // to the next row. - private boolean stickyNextRow; - private final byte[] stopRow; - - private final TimeRange tr; - - private final Filter filter; - - /** Keeps track of deletes */ - private final DeleteTracker deletes; - - /* - * The following three booleans define how we deal with deletes. - * There are three different aspects: - * 1. Whether to keep delete markers. This is used in compactions. - * Minor compactions always keep delete markers. - * 2. Whether to keep deleted rows. This is also used in compactions, - * if the store is set to keep deleted rows. This implies keeping - * the delete markers as well. - * In this case deleted rows are subject to the normal max version - * and TTL/min version rules just like "normal" rows. - * 3. Whether a scan can do time travel queries even before deleted - * marker to reach deleted rows. - */ - /** whether to retain delete markers */ - private boolean retainDeletesInOutput; - - /** whether to return deleted rows */ - private final KeepDeletedCells keepDeletedCells; - /** whether time range queries can see rows "behind" a delete */ - private final boolean seePastDeleteMarkers; - - - /** Keeps track of columns and versions */ - private final ColumnTracker columns; - - /** Key to seek to in memstore and StoreFiles */ - private final Cell startKey; - - /** Row comparator for the region this query is for */ - private final KeyValue.KVComparator rowComparator; - - /* row is not private for tests */ - /** Row the query is on */ - byte [] row; - int rowOffset; - short rowLength; - - /** - * Oldest put in any of the involved store files - * Used to decide whether it is ok to delete - * family delete marker of this store keeps - * deleted KVs. - */ - private final long earliestPutTs; - private final long ttl; - - /** The oldest timestamp we are interested in, based on TTL */ - private final long oldestUnexpiredTS; - private final long now; - - /** readPoint over which the KVs are unconditionally included */ - protected long maxReadPointToTrackVersions; - - private byte[] dropDeletesFromRow = null, dropDeletesToRow = null; - - /** - * This variable shows whether there is an null column in the query. There - * always exists a null column in the wildcard column query. - * There maybe exists a null column in the explicit column query based on the - * first column. - * */ - private boolean hasNullColumn = true; - - private RegionCoprocessorHost regionCoprocessorHost= null; - - // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete - // marker is always removed during a major compaction. If set to non-zero - // value then major compaction will try to keep a delete marker around for - // the given number of milliseconds. We want to keep the delete markers - // around a bit longer because old puts might appear out-of-order. For - // example, during log replication between two clusters. - // - // If the delete marker has lived longer than its column-family's TTL then - // the delete marker will be removed even if time.to.purge.deletes has not - // passed. This is because all the Puts that this delete marker can influence - // would have also expired. (Removing of delete markers on col family TTL will - // not happen if min-versions is set to non-zero) - // - // But, if time.to.purge.deletes has not expired then a delete - // marker will not be removed just because there are no Puts that it is - // currently influencing. This is because Puts, that this delete can - // influence. may appear out of order. - private final long timeToPurgeDeletes; - - private final boolean isUserScan; - - private final boolean isReversed; - - /** - * True if we are doing a 'Get' Scan. Every Get is actually a one-row Scan. - */ - private final boolean get; - - /** - * Construct a QueryMatcher for a scan - * @param scanInfo The store's immutable scan info - * @param scanType Type of the scan - * @param earliestPutTs Earliest put seen in any of the store files. - * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL - */ - public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, - ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, - long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException { - TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily()); - if (timeRange == null) { - this.tr = scan.getTimeRange(); - } else { - this.tr = timeRange; - } - this.get = scan.isGetScan(); - this.rowComparator = scanInfo.getComparator(); - this.regionCoprocessorHost = regionCoprocessorHost; - this.deletes = instantiateDeleteTracker(); - this.stopRow = scan.getStopRow(); - this.startKey = KeyValueUtil.createFirstDeleteFamilyOnRow(scan.getStartRow(), - scanInfo.getFamily()); - this.filter = scan.getFilter(); - this.earliestPutTs = earliestPutTs; - this.oldestUnexpiredTS = oldestUnexpiredTS; - this.now = now; - - this.maxReadPointToTrackVersions = readPointToUse; - this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes(); - this.ttl = oldestUnexpiredTS; - - /* how to deal with deletes */ - this.isUserScan = scanType == ScanType.USER_SCAN; - // keep deleted cells: if compaction or raw scan - this.keepDeletedCells = scan.isRaw() ? KeepDeletedCells.TRUE : - isUserScan ? KeepDeletedCells.FALSE : scanInfo.getKeepDeletedCells(); - // retain deletes: if minor compaction or raw scanisDone - this.retainDeletesInOutput = scanType == ScanType.COMPACT_RETAIN_DELETES || scan.isRaw(); - // seePastDeleteMarker: user initiated scans - this.seePastDeleteMarkers = - scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE && isUserScan; - - int maxVersions = - scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(), - scanInfo.getMaxVersions()); - - // Single branch to deal with two types of reads (columns vs all in family) - if (columns == null || columns.size() == 0) { - // there is always a null column in the wildcard column query. - hasNullColumn = true; - - // use a specialized scan for wildcard column tracker. - this.columns = new ScanWildcardColumnTracker( - scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); - } else { - // whether there is null column in the explicit column query - hasNullColumn = (columns.first().length == 0); - - // We can share the ExplicitColumnTracker, diff is we reset - // between rows, not between storefiles. - this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, - oldestUnexpiredTS); - } - this.isReversed = scan.isReversed(); - } - - private DeleteTracker instantiateDeleteTracker() throws IOException { - DeleteTracker tracker = new ScanDeleteTracker(); - if (regionCoprocessorHost != null) { - tracker = regionCoprocessorHost.postInstantiateDeleteTracker(tracker); - } - return tracker; - } - - /** - * Construct a QueryMatcher for a scan that drop deletes from a limited range of rows. - * @param scan - * @param scanInfo The store's immutable scan info - * @param columns - * @param earliestPutTs Earliest put seen in any of the store files. - * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL - * @param now the current server time - * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. - * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. - * @param regionCoprocessorHost - * @throws IOException - */ - public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, - long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, - byte[] dropDeletesFromRow, byte[] dropDeletesToRow, - RegionCoprocessorHost regionCoprocessorHost) throws IOException { - this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs, - oldestUnexpiredTS, now, regionCoprocessorHost); - Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null)); - this.dropDeletesFromRow = dropDeletesFromRow; - this.dropDeletesToRow = dropDeletesToRow; - } - - /* - * Constructor for tests - */ - ScanQueryMatcher(Scan scan, ScanInfo scanInfo, - NavigableSet columns, long oldestUnexpiredTS, long now) throws IOException { - this(scan, scanInfo, columns, ScanType.USER_SCAN, - Long.MAX_VALUE, /* max Readpoint to track versions */ - HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null); - } - - /** - * - * @return whether there is an null column in the query - */ - public boolean hasNullColumnInQuery() { - return hasNullColumn; - } - - /** - * Determines if the caller should do one of several things: - * - seek/skip to the next row (MatchCode.SEEK_NEXT_ROW) - * - seek/skip to the next column (MatchCode.SEEK_NEXT_COL) - * - include the current KeyValue (MatchCode.INCLUDE) - * - ignore the current KeyValue (MatchCode.SKIP) - * - got to the next row (MatchCode.DONE) - * - * @param cell KeyValue to check - * @return The match code instance. - * @throws IOException in case there is an internal consistency problem - * caused by a data corruption. - */ - public MatchCode match(Cell cell) throws IOException { - if (filter != null && filter.filterAllRemaining()) { - return MatchCode.DONE_SCAN; - } - if (row != null) { - int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength, - cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - if (!this.isReversed) { - if (ret <= -1) { - return MatchCode.DONE; - } else if (ret >= 1) { - // could optimize this, if necessary? - // Could also be called SEEK_TO_CURRENT_ROW, but this - // should be rare/never happens. - return MatchCode.SEEK_NEXT_ROW; - } - } else { - if (ret <= -1) { - return MatchCode.SEEK_NEXT_ROW; - } else if (ret >= 1) { - return MatchCode.DONE; - } - } - } else { - return MatchCode.DONE; - } - - // optimize case. - if (this.stickyNextRow) - return MatchCode.SEEK_NEXT_ROW; - - if (this.columns.done()) { - stickyNextRow = true; - return MatchCode.SEEK_NEXT_ROW; - } - - int qualifierOffset = cell.getQualifierOffset(); - int qualifierLength = cell.getQualifierLength(); - - long timestamp = cell.getTimestamp(); - // check for early out based on timestamp alone - if (columns.isDone(timestamp)) { - return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset, - qualifierLength); - } - // check if the cell is expired by cell TTL - if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) { - return MatchCode.SKIP; - } - - /* - * The delete logic is pretty complicated now. - * This is corroborated by the following: - * 1. The store might be instructed to keep deleted rows around. - * 2. A scan can optionally see past a delete marker now. - * 3. If deleted rows are kept, we have to find out when we can - * remove the delete markers. - * 4. Family delete markers are always first (regardless of their TS) - * 5. Delete markers should not be counted as version - * 6. Delete markers affect puts of the *same* TS - * 7. Delete marker need to be version counted together with puts - * they affect - */ - byte typeByte = cell.getTypeByte(); - long mvccVersion = cell.getMvccVersion(); - if (CellUtil.isDelete(cell)) { - if (keepDeletedCells == KeepDeletedCells.FALSE - || (keepDeletedCells == KeepDeletedCells.TTL && timestamp < ttl)) { - // first ignore delete markers if the scanner can do so, and the - // range does not include the marker - // - // during flushes and compactions also ignore delete markers newer - // than the readpoint of any open scanner, this prevents deleted - // rows that could still be seen by a scanner from being collected - boolean includeDeleteMarker = seePastDeleteMarkers ? - tr.withinTimeRange(timestamp) : - tr.withinOrAfterTimeRange(timestamp); - if (includeDeleteMarker - && mvccVersion <= maxReadPointToTrackVersions) { - this.deletes.add(cell); - } - // Can't early out now, because DelFam come before any other keys - } - - if ((!isUserScan) - && timeToPurgeDeletes > 0 - && (EnvironmentEdgeManager.currentTime() - timestamp) - <= timeToPurgeDeletes) { - return MatchCode.INCLUDE; - } else if (retainDeletesInOutput || mvccVersion > maxReadPointToTrackVersions) { - // always include or it is not time yet to check whether it is OK - // to purge deltes or not - if (!isUserScan) { - // if this is not a user scan (compaction), we can filter this deletemarker right here - // otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking - return MatchCode.INCLUDE; - } - } else if (keepDeletedCells == KeepDeletedCells.TRUE - || (keepDeletedCells == KeepDeletedCells.TTL && timestamp >= ttl)) { - if (timestamp < earliestPutTs) { - // keeping delete rows, but there are no puts older than - // this delete in the store files. - return columns.getNextRowOrNextColumn(cell.getQualifierArray(), - qualifierOffset, qualifierLength); - } - // else: fall through and do version counting on the - // delete markers - } else { - return MatchCode.SKIP; - } - // note the following next else if... - // delete marker are not subject to other delete markers - } else if (!this.deletes.isEmpty()) { - DeleteResult deleteResult = deletes.isDeleted(cell); - switch (deleteResult) { - case FAMILY_DELETED: - case COLUMN_DELETED: - return columns.getNextRowOrNextColumn(cell.getQualifierArray(), - qualifierOffset, qualifierLength); - case VERSION_DELETED: - case FAMILY_VERSION_DELETED: - return MatchCode.SKIP; - case NOT_DELETED: - break; - default: - throw new RuntimeException("UNEXPECTED"); - } - } - - // NOTE: Cryptic stuff! - // if the timestamp is HConstants.OLDEST_TIMESTAMP, then this is a fake cell made to prime a - // Scanner; See KeyValueUTil#createLastOnRow. This Cell should never end up returning out of - // here a matchcode of INCLUDE else we will return to the client a fake Cell. If we call - // TimeRange, it will return 0 because it doesn't deal in OLDEST_TIMESTAMP and we will fall - // into the later code where we could return a matchcode of INCLUDE. See HBASE-16074 "ITBLL - // fails, reports lost big or tiny families" for a horror story. Check here for - // OLDEST_TIMESTAMP. TimeRange#compare is about more generic timestamps, between 0L and - // Long.MAX_LONG. It doesn't do OLDEST_TIMESTAMP weird handling. - int timestampComparison = timestamp == HConstants.OLDEST_TIMESTAMP? -1: tr.compare(timestamp); - if (timestampComparison >= 1) { - return MatchCode.SKIP; - } else if (timestampComparison <= -1) { - return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset, - qualifierLength); - } - - // STEP 1: Check if the column is part of the requested columns - MatchCode colChecker = columns.checkColumn(cell.getQualifierArray(), - qualifierOffset, qualifierLength, typeByte); - if (colChecker == MatchCode.INCLUDE) { - ReturnCode filterResponse = ReturnCode.SKIP; - // STEP 2: Yes, the column is part of the requested columns. Check if filter is present - if (filter != null) { - // STEP 3: Filter the key value and return if it filters out - filterResponse = filter.filterKeyValue(cell); - switch (filterResponse) { - case SKIP: - return MatchCode.SKIP; - case NEXT_COL: - return columns.getNextRowOrNextColumn(cell.getQualifierArray(), - qualifierOffset, qualifierLength); - case NEXT_ROW: - stickyNextRow = true; - return MatchCode.SEEK_NEXT_ROW; - case SEEK_NEXT_USING_HINT: - return MatchCode.SEEK_NEXT_USING_HINT; - default: - //It means it is either include or include and seek next - break; - } - } - /* - * STEP 4: Reaching this step means the column is part of the requested columns and either - * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response. - * Now check the number of versions needed. This method call returns SKIP, INCLUDE, - * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL. - * - * FilterResponse ColumnChecker Desired behavior - * INCLUDE SKIP row has already been included, SKIP. - * INCLUDE INCLUDE INCLUDE - * INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP. - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * - * In all the above scenarios, we return the column checker return value except for - * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE) - */ - colChecker = - columns.checkVersions(cell.getQualifierArray(), qualifierOffset, - qualifierLength, timestamp, typeByte, - mvccVersion > maxReadPointToTrackVersions); - //Optimize with stickyNextRow - stickyNextRow = colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW ? true : stickyNextRow; - return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && - colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL - : colChecker; - } - stickyNextRow = (colChecker == MatchCode.SEEK_NEXT_ROW) ? true - : stickyNextRow; - return colChecker; - } - - /** Handle partial-drop-deletes. As we match keys in order, when we have a range from which - * we can drop deletes, we can set retainDeletesInOutput to false for the duration of this - * range only, and maintain consistency. */ - private void checkPartialDropDeleteRange(byte [] row, int offset, short length) { - // If partial-drop-deletes are used, initially, dropDeletesFromRow and dropDeletesToRow - // are both set, and the matcher is set to retain deletes. We assume ordered keys. When - // dropDeletesFromRow is leq current kv, we start dropping deletes and reset - // dropDeletesFromRow; thus the 2nd "if" starts to apply. - if ((dropDeletesFromRow != null) - && ((dropDeletesFromRow == HConstants.EMPTY_START_ROW) - || (Bytes.compareTo(row, offset, length, - dropDeletesFromRow, 0, dropDeletesFromRow.length) >= 0))) { - retainDeletesInOutput = false; - dropDeletesFromRow = null; - } - // If dropDeletesFromRow is null and dropDeletesToRow is set, we are inside the partial- - // drop-deletes range. When dropDeletesToRow is leq current kv, we stop dropping deletes, - // and reset dropDeletesToRow so that we don't do any more compares. - if ((dropDeletesFromRow == null) - && (dropDeletesToRow != null) && (dropDeletesToRow != HConstants.EMPTY_END_ROW) - && (Bytes.compareTo(row, offset, length, - dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) { - retainDeletesInOutput = true; - dropDeletesToRow = null; - } - } - - /** - * @return Returns false if we know there are no more rows to be scanned (We've reached the - * stopRow or we are scanning on row only because this Scan is for a Get, etc. - */ - public boolean moreRowsMayExistAfter(Cell kv) { - // If a 'get' Scan -- we are doing a Get (every Get is a single-row Scan in implementation) -- - // then we are looking at one row only, the one specified in the Get coordinate..so we know - // for sure that there are no more rows on this Scan - if (this.get) { - return false; - } - // If no stopRow, return that there may be more rows. The tests that follow depend on a - // non-empty, non-default stopRow so this little test below short-circuits out doing the - // following compares. - if (this.stopRow == null || this.stopRow == HConstants.EMPTY_BYTE_ARRAY) { - return true; - } - return this.isReversed? - rowComparator.compareRows(kv, stopRow, 0, stopRow.length) > 0: - Bytes.equals(stopRow, HConstants.EMPTY_END_ROW) || - rowComparator.compareRows(kv, stopRow, 0, stopRow.length) < 0; - } - - /** - * Set current row - * @param row - */ - public void setRow(byte [] row, int offset, short length) { - checkPartialDropDeleteRange(row, offset, length); - this.row = row; - this.rowOffset = offset; - this.rowLength = length; - reset(); - } - - public void reset() { - this.deletes.reset(); - this.columns.reset(); - - stickyNextRow = false; - } - - /** - * - * @return the start key - */ - public Cell getStartKey() { - return this.startKey; - } - - /** - * - * @return the Filter - */ - Filter getFilter() { - return this.filter; - } - - public Cell getNextKeyHint(Cell kv) throws IOException { - if (filter == null) { - return null; - } else { - return filter.getNextCellHint(kv); - } - } - - public Cell getKeyForNextColumn(Cell kv) { - ColumnCount nextColumn = columns.getColumnHint(); - if (nextColumn == null) { - return KeyValueUtil.createLastOnRow( - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), - kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); - } else { - return KeyValueUtil.createFirstOnRow( - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), - nextColumn.getBuffer(), nextColumn.getOffset(), nextColumn.getLength()); - } - } - - /** - * @param nextIndexed the key of the next entry in the block index (if any) - * @param kv The Cell we're using to calculate the seek key - * @return result of the compare between the indexed key and the key portion of the passed cell - */ - public int compareKeyForNextRow(Cell nextIndexed, Cell kv) { - return rowComparator.compareKey(nextIndexed, - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), - null, 0, 0, - null, 0, 0, - HConstants.OLDEST_TIMESTAMP, Type.Minimum.getCode()); - } - - /** - * @param nextIndexed the key of the next entry in the block index (if any) - * @param kv The Cell we're using to calculate the seek key - * @return result of the compare between the indexed key and the key portion of the passed cell - */ - public int compareKeyForNextColumn(Cell nextIndexed, Cell kv) { - ColumnCount nextColumn = columns.getColumnHint(); - if (nextColumn == null) { - return rowComparator.compareKey(nextIndexed, - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), - kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), - HConstants.OLDEST_TIMESTAMP, Type.Minimum.getCode()); - } else { - return rowComparator.compareKey(nextIndexed, - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), - nextColumn.getBuffer(), nextColumn.getOffset(), nextColumn.getLength(), - HConstants.LATEST_TIMESTAMP, Type.Maximum.getCode()); - } - } - - boolean isUserScan() { - return this.isUserScan; - } - - //Used only for testing purposes - static MatchCode checkColumn(ColumnTracker columnTracker, byte[] bytes, int offset, - int length, long ttl, byte type, boolean ignoreCount) throws IOException { - MatchCode matchCode = columnTracker.checkColumn(bytes, offset, length, type); - if (matchCode == MatchCode.INCLUDE) { - return columnTracker.checkVersions(bytes, offset, length, ttl, type, ignoreCount); - } - return matchCode; - } - - /** - * {@link #match} return codes. These instruct the scanner moving through - * memstores and StoreFiles what to do with the current KeyValue. - *

- * Additionally, this contains "early-out" language to tell the scanner to - * move on to the next File (memstore or Storefile), or to return immediately. - */ - public static enum MatchCode { - /** - * Include KeyValue in the returned result - */ - INCLUDE, - - /** - * Do not include KeyValue in the returned result - */ - SKIP, - - /** - * Do not include, jump to next StoreFile or memstore (in time order) - */ - NEXT, - - /** - * Do not include, return current result - */ - DONE, - - /** - * These codes are used by the ScanQueryMatcher - */ - - /** - * Done with the row, seek there. - */ - SEEK_NEXT_ROW, - /** - * Done with column, seek to next. - */ - SEEK_NEXT_COL, - - /** - * Done with scan, thanks to the row filter. - */ - DONE_SCAN, - - /* - * Seek to next key which is given as hint. - */ - SEEK_NEXT_USING_HINT, - - /** - * Include KeyValue and done with column, seek to next. - */ - INCLUDE_AND_SEEK_NEXT_COL, - - /** - * Include KeyValue and done with row, seek to next. - */ - INCLUDE_AND_SEEK_NEXT_ROW, - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index e7a4de5a862..1b41eb0dbfd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index a5f49a687fe..2076c8d7a96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -1185,44 +1185,23 @@ public class StoreFile { return reader.getComparator(); } - /** - * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting - * {@code isCompaction} to false, {@code readPt} to 0 and {@code scannerOrder} to 0. - * Do not use this overload if using this scanner for compactions. - * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) - */ - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { - // 0 is passed as readpoint because this method is only used by test - // where StoreFile is directly operated upon - return getStoreFileScanner(cacheBlocks, pread, false, 0, 0); - } - - /** - * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting - * {@code scannerOrder} to 0. - * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) - */ - public StoreFileScanner getStoreFileScanner( - boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) { - return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0); - } - /** * Get a scanner to scan over this StoreFile. * @param cacheBlocks should this scanner cache blocks? * @param pread use pread (for highly concurrent small readers) * @param isCompaction is scanner being used for compaction? * @param scannerOrder Order of this scanner relative to other scanners. See - * {@link KeyValueScanner#getScannerOrder()}. + * {@link KeyValueScanner#getScannerOrder()}. + * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column, + * otherwise {@code false}. This is a hint for optimization. * @return a scanner */ - public StoreFileScanner getStoreFileScanner( - boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, long scannerOrder) { + public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, + boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { // Increment the ref count refCount.incrementAndGet(); - return new StoreFileScanner( - this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(), - readPt, scannerOrder); + return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, + reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 02a4cae2ce8..975d3c77c90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -29,16 +29,17 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; /** * KeyValueScanner adaptor over the Reader. It also provides hooks into @@ -58,49 +59,41 @@ public class StoreFileScanner implements KeyValueScanner { private boolean delayedReseek; private Cell delayedSeekKV; - private boolean enforceMVCC = false; - private boolean hasMVCCInfo = false; + private final boolean enforceMVCC; + private final boolean hasMVCCInfo; // A flag represents whether could stop skipping KeyValues for MVCC // if have encountered the next row. Only used for reversed scan private boolean stopSkippingKVsIfNextRow = false; private static AtomicLong seekCount; - private ScanQueryMatcher matcher; + private final boolean canOptimizeForNonNullColumn; - private long readPt; + private final long readPt; // Order of this scanner relative to other scanners when duplicate key-value is found. // Higher values means scanner has newer data. - private long scannerOrder; + private final long scannerOrder; /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}. * @param readPt MVCC value to use to filter out the updates newer than this scanner. * @param hasMVCC Set to true if underlying store file reader has MVCC info. + * @param scannerOrder Order of the scanner relative to other scanners. See + * {@link KeyValueScanner#getScannerOrder()}. + * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column, + * otherwise {@code false}. This is a hint for optimization. */ public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, - boolean hasMVCC, long readPt) { - this (reader, hfs, useMVCC, hasMVCC, readPt, 0); - } - - /** - * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} - * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}. - * @param readPt MVCC value to use to filter out the updates newer than this scanner. - * @param hasMVCC Set to true if underlying store file reader has MVCC info. - * @param scannerOrder Order of the scanner relative to other scanners. - * See {@link KeyValueScanner#getScannerOrder()}. - */ - public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, - boolean hasMVCC, long readPt, long scannerOrder) { + boolean hasMVCC, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { this.readPt = readPt; this.reader = reader; this.hfs = hfs; this.enforceMVCC = useMVCC; this.hasMVCCInfo = hasMVCC; this.scannerOrder = scannerOrder; + this.canOptimizeForNonNullColumn = canOptimizeForNonNullColumn; } boolean isPrimaryReplica() { @@ -130,24 +123,20 @@ public class StoreFileScanner implements KeyValueScanner { } /** - * Return an array of scanners corresponding to the given set of store files, - * And set the ScanQueryMatcher for each store file scanner for further - * optimization + * Return an array of scanners corresponding to the given set of store files, And set the + * ScanQueryMatcher for each store file scanner for further optimization */ - public static List getScannersForStoreFiles( - Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, boolean canUseDrop, + public static List getScannersForStoreFiles(Collection files, + boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException { - List scanners = new ArrayList( - files.size()); + List scanners = new ArrayList(files.size()); List sorted_files = new ArrayList<>(files); Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID); for (int i = 0; i < sorted_files.size(); i++) { StoreFile.Reader r = sorted_files.get(i).createReader(canUseDrop); r.setReplicaStoreFile(isPrimaryReplica); - StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, - isCompaction, readPt, i); - scanner.setScanQueryMatcher(matcher); + StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt, + i, matcher != null ? !matcher.hasNullColumnInQuery() : false); scanners.add(scanner); } return scanners; @@ -367,12 +356,12 @@ public class StoreFileScanner implements KeyValueScanner { haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); - } else if (this.matcher != null && !matcher.hasNullColumnInQuery() && - ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) { + } else if (canOptimizeForNonNullColumn + && ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) { // if there is no such delete family kv in the store file, // then no need to seek. - haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), - kv.getRowOffset(), kv.getRowLength()); + haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), kv.getRowOffset(), + kv.getRowLength()); } } @@ -441,10 +430,6 @@ public class StoreFileScanner implements KeyValueScanner { } } - public void setScanQueryMatcher(ScanQueryMatcher matcher) { - this.matcher = matcher; - } - @Override public boolean isFileScanner() { return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 4447556c2f2..c98af005a42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -41,15 +43,17 @@ import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; +import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.common.annotations.VisibleForTesting; - /** * Scanner scans both the memstore and the Store. Coalesce KeyValue stream * into List<KeyValue> for a single row. @@ -176,6 +180,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected void addCurrentScanners(List scanners) { this.currentScanners.addAll(scanners); } + /** * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we * are not in a compaction. @@ -192,9 +197,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); } - matcher = new ScanQueryMatcher(scan, scanInfo, columns, - ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, - oldestUnexpiredTS, now, store.getCoprocessorHost()); + matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, + store.getCoprocessorHost()); this.store.addChangedReaderObserver(this); @@ -263,13 +267,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { this(store, scan, scanInfo, null, - ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false); - if (dropDeletesFromRow == null) { - matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, - earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost()); + ((HStore) store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false); + if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0) + || (scan.getStopRow() != null && scan.getStopRow().length > 0) + || !scan.getTimeRange().isAllTime()) { + // use legacy query matcher since we do not consider the scan object in our code. Only used to + // keep compatibility for coprocessor. + matcher = LegacyScanQueryMatcher.create(scan, scanInfo, null, scanType, smallestReadPoint, + earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, + store.getCoprocessorHost()); } else { - matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs, - oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); + matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, + earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, + store.getCoprocessorHost()); } // Filter the list of scanners using Bloom filters, time range, TTL, etc. @@ -302,18 +312,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner 0); } - private StoreScanner(final Scan scan, ScanInfo scanInfo, - ScanType scanType, final NavigableSet columns, - final List scanners, long earliestPutTs, long readPt) - throws IOException { + public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, + final NavigableSet columns, final List scanners, long earliestPutTs, + long readPt) throws IOException { this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks()); - this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, - Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null); - - // In unit tests, the store could be null - if (this.store != null) { - this.store.addChangedReaderObserver(this); + if (scanType == ScanType.USER_SCAN) { + this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, + null); + } else { + if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0) + || (scan.getStopRow() != null && scan.getStopRow().length > 0) + || !scan.getTimeRange().isAllTime() || columns != null) { + // use legacy query matcher since we do not consider the scan object in our code. Only used + // to keep compatibility for coprocessor. + matcher = LegacyScanQueryMatcher.create(scan, scanInfo, columns, scanType, Long.MAX_VALUE, + earliestPutTs, oldestUnexpiredTS, now, null, null, store.getCoprocessorHost()); + } else { + this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, + earliestPutTs, oldestUnexpiredTS, now, null, null, null); + } } + // Seek all scanners to the initial key seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); addCurrentScanners(scanners); @@ -487,16 +506,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // only call setRow if the row changes; avoids confusing the query matcher // if scanning intra-row - byte[] row = cell.getRowArray(); - int offset = cell.getRowOffset(); - short length = cell.getRowLength(); // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing // rows. Else it is possible we are still traversing the same row so we must perform the row // comparison. - if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null) { + if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { this.countPerRow = 0; - matcher.setRow(row, offset, length); + matcher.setToNewRow(cell); } // Clear progress away unless invoker has indicated it should be kept. @@ -524,14 +540,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner ScanQueryMatcher.MatchCode qcode = matcher.match(cell); qcode = optimize(qcode, cell); - switch(qcode) { + switch (qcode) { case INCLUDE: case INCLUDE_AND_SEEK_NEXT_ROW: case INCLUDE_AND_SEEK_NEXT_COL: Filter f = matcher.getFilter(); if (f != null) { - // TODO convert Scan Query Matcher to be Cell instead of KV based ? cell = f.transformCell(cell); } @@ -545,7 +560,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow() // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do // another compareRow to say the current row is DONE - matcher.row = null; + matcher.clearCurrentRow(); seekToNextRow(cell); break LOOP; } @@ -576,7 +591,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow() // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do // another compareRow to say the current row is DONE - matcher.row = null; + matcher.clearCurrentRow(); seekToNextRow(cell); } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { seekAsDirection(matcher.getKeyForNextColumn(cell)); @@ -602,7 +617,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // We are sure that this row is done and we are in the next row. // So subsequent StoresScanner.next() call need not do another compare // and set the matcher.row - matcher.row = null; + matcher.clearCurrentRow(); return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); case DONE_SCAN: @@ -618,7 +633,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow() // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do // another compareRow to say the current row is DONE - matcher.row = null; + matcher.clearCurrentRow(); seekToNextRow(cell); break; @@ -631,7 +646,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner break; case SEEK_NEXT_USING_HINT: - // TODO convert resee to Cell? Cell nextKV = matcher.getNextKeyHint(cell); if (nextKV != null) { seekAsDirection(nextKV); @@ -840,11 +854,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner byte[] row = kv.getRowArray(); int offset = kv.getRowOffset(); short length = kv.getRowLength(); - if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, - matcher.rowOffset, matcher.rowLength)) { + Cell currentRow = matcher.currentRow(); + + if ((currentRow == null) || !Bytes.equals(row, offset, length, currentRow.getRowArray(), + currentRow.getRowOffset(), currentRow.getRowLength())) { this.countPerRow = 0; - matcher.reset(); - matcher.setRow(row, offset, length); + matcher.setToNewRow(kv); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java similarity index 97% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java index 71ea1bd6c7e..10961a95416 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver; +package org.apache.hadoop.hbase.regionserver.querymatcher; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * NOT thread-safe because it is not used in a multi-threaded context, yet. */ @InterfaceAudience.Private -public class ColumnCount { +class ColumnCount { private final byte [] bytes; private final int offset; private final int length; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java similarity index 95% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java index f23a9a87aa9..8ac78ab2841 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java @@ -16,12 +16,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver; +package org.apache.hadoop.hbase.regionserver.querymatcher; import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; /** * Implementing classes of this interface will be used for the tracking @@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; * believes that the current column should be skipped (by timestamp, filter etc.) * *

- * These two methods returns a - * {@link org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode} + * These two methods returns a + * {@link org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode} * to define what action should be taken. *

* This class is NOT thread-safe as queries are never multi-threaded diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java new file mode 100644 index 00000000000..d3224dceab1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.querymatcher; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; + +/** + * Query matcher for compaction. + */ +@InterfaceAudience.Private +public abstract class CompactionScanQueryMatcher extends ScanQueryMatcher { + + /** readPoint over which the KVs are unconditionally included */ + protected final long maxReadPointToTrackVersions; + + /** Keeps track of deletes */ + protected final DeleteTracker deletes; + + /** whether to return deleted rows */ + protected final KeepDeletedCells keepDeletedCells; + + protected CompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes, + long readPointToUse, long oldestUnexpiredTS, long now) { + super(HConstants.EMPTY_START_ROW, scanInfo, + new ScanWildcardColumnTracker(scanInfo.getMinVersions(), scanInfo.getMaxVersions(), + oldestUnexpiredTS), + oldestUnexpiredTS, now); + this.maxReadPointToTrackVersions = readPointToUse; + this.deletes = deletes; + this.keepDeletedCells = scanInfo.getKeepDeletedCells(); + } + + @Override + public boolean hasNullColumnInQuery() { + return true; + } + + @Override + public boolean isUserScan() { + return false; + } + + @Override + public boolean moreRowsMayExistAfter(Cell cell) { + return true; + } + + @Override + public Filter getFilter() { + // no filter when compaction + return null; + } + + @Override + public Cell getNextKeyHint(Cell cell) throws IOException { + // no filter, so no key hint. + return null; + } + + @Override + protected void reset() { + deletes.reset(); + } + + protected final void trackDelete(Cell cell) { + // If keepDeletedCells is true, then we only remove cells by versions or TTL during + // compaction, so we do not need to track delete here. + // If keepDeletedCells is TTL and the delete marker is expired, then we can make sure that the + // minVerions is larger than 0(otherwise we will just return at preCheck). So here we still + // need to track the delete marker to see if it masks some cells. + if (keepDeletedCells == KeepDeletedCells.FALSE + || (keepDeletedCells == KeepDeletedCells.TTL && cell.getTimestamp() < oldestUnexpiredTS)) { + deletes.add(cell); + } + } + + public static CompactionScanQueryMatcher create(ScanInfo scanInfo, ScanType scanType, + long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, + byte[] dropDeletesFromRow, byte[] dropDeletesToRow, + RegionCoprocessorHost regionCoprocessorHost) throws IOException { + DeleteTracker deleteTracker = instantiateDeleteTracker(regionCoprocessorHost); + if (dropDeletesFromRow == null) { + if (scanType == ScanType.COMPACT_RETAIN_DELETES) { + return new MinorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse, + oldestUnexpiredTS, now); + } else { + return new MajorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse, + earliestPutTs, oldestUnexpiredTS, now); + } + } else { + return new StripeCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse, + earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java similarity index 81% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeleteTracker.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java index 8f466fc720f..4e1ba4ec2e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeleteTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver; +package org.apache.hadoop.hbase.regionserver.querymatcher; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; /** - * This interface is used for the tracking and enforcement of Deletes - * during the course of a Get or Scan operation. + * This interface is used for the tracking and enforcement of Deletes during the course of a Get or + * Scan operation. *

* This class is utilized through three methods: - *