diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java index fed20c49466..2efcde1982b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java @@ -184,12 +184,11 @@ public class TimeRange { } /** - * Check if the specified timestamp is within this TimeRange. + * Check if the specified timestamp is within or after this TimeRange. *

- * Returns true if within interval [minStamp, maxStamp), false - * if not. + * Returns true if greater than minStamp, false if not. * @param timestamp timestamp to check - * @return true if within TimeRange, false if not + * @return true if within or after TimeRange, false if not */ public boolean withinOrAfterTimeRange(long timestamp) { assert timestamp >= 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index bff727a98b7..748268ecde8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.regionserver.DeleteTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Pair; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 47299541a41..4c946440963 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.regionserver.DeleteTracker; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index b32b757f3aa..c4bd84951c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; @@ -1749,35 +1750,6 @@ public class HStore implements Store { return wantedVersions > maxVersions ? maxVersions: wantedVersions; } - /** - * @param cell - * @param oldestTimestamp - * @return true if the cell is expired - */ - static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) { - // Look for a TTL tag first. Use it instead of the family setting if - // found. If a cell has multiple TTLs, resolve the conflict by using the - // first tag encountered. - Iterator i = CellUtil.tagsIterator(cell); - while (i.hasNext()) { - Tag t = i.next(); - if (TagType.TTL_TAG_TYPE == t.getType()) { - // Unlike in schema cell TTLs are stored in milliseconds, no need - // to convert - long ts = cell.getTimestamp(); - assert t.getValueLength() == Bytes.SIZEOF_LONG; - long ttl = TagUtil.getValueAsLong(t); - if (ts + ttl < now) { - return true; - } - // Per cell TTLs cannot extend lifetime beyond family settings, so - // fall through to check that - break; - } - } - return false; - } - @Override public boolean canSplit() { this.lock.readLock().lock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index acfaa96a71a..567664e4f6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.wal.WALKey; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java index 42f7369ff23..349e1661c31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java @@ -122,11 +122,11 @@ public class ScanInfo { return this.parallelSeekEnabled; } - byte[] getFamily() { + public byte[] getFamily() { return family; } - int getMinVersions() { + public int getMinVersions() { return minVersions; } @@ -138,7 +138,7 @@ public class ScanInfo { return ttl; } - KeepDeletedCells getKeepDeletedCells() { + public KeepDeletedCells getKeepDeletedCells() { return keepDeletedCells; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java deleted file mode 100644 index d2d0ccbbc28..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ /dev/null @@ -1,699 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.Arrays; -import java.util.NavigableSet; - -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeepDeletedCells; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.Filter.ReturnCode; -import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -import com.google.common.base.Preconditions; - -/** - * A query matcher that is specifically designed for the scan case. - */ -@InterfaceAudience.Private -public class ScanQueryMatcher { - // Optimization so we can skip lots of compares when we decide to skip - // to the next row. - private boolean stickyNextRow; - private final byte[] stopRow; - - private final TimeRange tr; - - private final Filter filter; - - /** Keeps track of deletes */ - private final DeleteTracker deletes; - - /* - * The following three booleans define how we deal with deletes. - * There are three different aspects: - * 1. Whether to keep delete markers. This is used in compactions. - * Minor compactions always keep delete markers. - * 2. Whether to keep deleted rows. This is also used in compactions, - * if the store is set to keep deleted rows. This implies keeping - * the delete markers as well. - * In this case deleted rows are subject to the normal max version - * and TTL/min version rules just like "normal" rows. - * 3. Whether a scan can do time travel queries even before deleted - * marker to reach deleted rows. - */ - /** whether to retain delete markers */ - private boolean retainDeletesInOutput; - - /** whether to return deleted rows */ - private final KeepDeletedCells keepDeletedCells; - /** whether time range queries can see rows "behind" a delete */ - private final boolean seePastDeleteMarkers; - - - /** Keeps track of columns and versions */ - private final ColumnTracker columns; - - /** Key to seek to in memstore and StoreFiles */ - private final Cell startKey; - - /** Row comparator for the region this query is for */ - private final CellComparator rowComparator; - - /* row is not private for tests */ - /** Row the query is on */ - Cell curCell; - - /** - * Oldest put in any of the involved store files - * Used to decide whether it is ok to delete - * family delete marker of this store keeps - * deleted KVs. - */ - private final long earliestPutTs; - private final long ttl; - - /** The oldest timestamp we are interested in, based on TTL */ - private final long oldestUnexpiredTS; - private final long now; - - /** readPoint over which the KVs are unconditionally included */ - protected long maxReadPointToTrackVersions; - - private byte[] dropDeletesFromRow = null, dropDeletesToRow = null; - - /** - * This variable shows whether there is an null column in the query. There - * always exists a null column in the wildcard column query. - * There maybe exists a null column in the explicit column query based on the - * first column. - * */ - private boolean hasNullColumn = true; - - private RegionCoprocessorHost regionCoprocessorHost= null; - - // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete - // marker is always removed during a major compaction. If set to non-zero - // value then major compaction will try to keep a delete marker around for - // the given number of milliseconds. We want to keep the delete markers - // around a bit longer because old puts might appear out-of-order. For - // example, during log replication between two clusters. - // - // If the delete marker has lived longer than its column-family's TTL then - // the delete marker will be removed even if time.to.purge.deletes has not - // passed. This is because all the Puts that this delete marker can influence - // would have also expired. (Removing of delete markers on col family TTL will - // not happen if min-versions is set to non-zero) - // - // But, if time.to.purge.deletes has not expired then a delete - // marker will not be removed just because there are no Puts that it is - // currently influencing. This is because Puts, that this delete can - // influence. may appear out of order. - private final long timeToPurgeDeletes; - - private final boolean isUserScan; - - private final boolean isReversed; - - /** - * True if we are doing a 'Get' Scan. Every Get is actually a one-row Scan. - */ - private final boolean get; - - /** - * Construct a QueryMatcher for a scan - * @param scanInfo The store's immutable scan info - * @param scanType Type of the scan - * @param earliestPutTs Earliest put seen in any of the store files. - * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL - */ - public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, - ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, - long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException { - TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily()); - if (timeRange == null) { - this.tr = scan.getTimeRange(); - } else { - this.tr = timeRange; - } - this.get = scan.isGetScan(); - this.rowComparator = scanInfo.getComparator(); - this.regionCoprocessorHost = regionCoprocessorHost; - this.deletes = instantiateDeleteTracker(); - this.stopRow = scan.getStopRow(); - this.startKey = CellUtil.createFirstDeleteFamilyCellOnRow(scan.getStartRow(), - scanInfo.getFamily()); - this.filter = scan.getFilter(); - this.earliestPutTs = earliestPutTs; - this.oldestUnexpiredTS = oldestUnexpiredTS; - this.now = now; - - this.maxReadPointToTrackVersions = readPointToUse; - this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes(); - this.ttl = oldestUnexpiredTS; - - /* how to deal with deletes */ - this.isUserScan = scanType == ScanType.USER_SCAN; - // keep deleted cells: if compaction or raw scan - this.keepDeletedCells = scan.isRaw() ? KeepDeletedCells.TRUE : - isUserScan ? KeepDeletedCells.FALSE : scanInfo.getKeepDeletedCells(); - // retain deletes: if minor compaction or raw scanisDone - this.retainDeletesInOutput = scanType == ScanType.COMPACT_RETAIN_DELETES || scan.isRaw(); - // seePastDeleteMarker: user initiated scans - this.seePastDeleteMarkers = - scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE && isUserScan; - - int maxVersions = - scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(), - scanInfo.getMaxVersions()); - - // Single branch to deal with two types of reads (columns vs all in family) - if (columns == null || columns.size() == 0) { - // there is always a null column in the wildcard column query. - hasNullColumn = true; - - // use a specialized scan for wildcard column tracker. - this.columns = new ScanWildcardColumnTracker( - scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); - } else { - // whether there is null column in the explicit column query - hasNullColumn = (columns.first().length == 0); - - // We can share the ExplicitColumnTracker, diff is we reset - // between rows, not between storefiles. - this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, - oldestUnexpiredTS); - } - this.isReversed = scan.isReversed(); - } - - private DeleteTracker instantiateDeleteTracker() throws IOException { - DeleteTracker tracker = new ScanDeleteTracker(); - if (regionCoprocessorHost != null) { - tracker = regionCoprocessorHost.postInstantiateDeleteTracker(tracker); - } - return tracker; - } - - /** - * Construct a QueryMatcher for a scan that drop deletes from a limited range of rows. - * @param scan - * @param scanInfo The store's immutable scan info - * @param columns - * @param earliestPutTs Earliest put seen in any of the store files. - * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL - * @param now the current server time - * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. - * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. - * @param regionCoprocessorHost - * @throws IOException - */ - public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, - long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, - byte[] dropDeletesFromRow, byte[] dropDeletesToRow, - RegionCoprocessorHost regionCoprocessorHost) throws IOException { - this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs, - oldestUnexpiredTS, now, regionCoprocessorHost); - Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null)); - this.dropDeletesFromRow = dropDeletesFromRow; - this.dropDeletesToRow = dropDeletesToRow; - } - - /* - * Constructor for tests - */ - ScanQueryMatcher(Scan scan, ScanInfo scanInfo, - NavigableSet columns, long oldestUnexpiredTS, long now) throws IOException { - this(scan, scanInfo, columns, ScanType.USER_SCAN, - Long.MAX_VALUE, /* max Readpoint to track versions */ - HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null); - } - - /** - * - * @return whether there is an null column in the query - */ - public boolean hasNullColumnInQuery() { - return hasNullColumn; - } - - /** - * Determines if the caller should do one of several things: - * - seek/skip to the next row (MatchCode.SEEK_NEXT_ROW) - * - seek/skip to the next column (MatchCode.SEEK_NEXT_COL) - * - include the current KeyValue (MatchCode.INCLUDE) - * - ignore the current KeyValue (MatchCode.SKIP) - * - got to the next row (MatchCode.DONE) - * - * @param cell KeyValue to check - * @return The match code instance. - * @throws IOException in case there is an internal consistency problem - * caused by a data corruption. - */ - public MatchCode match(Cell cell) throws IOException { - if (filter != null && filter.filterAllRemaining()) { - return MatchCode.DONE_SCAN; - } - if (curCell != null) { - int ret = this.rowComparator.compareRows(curCell, cell); - if (!this.isReversed) { - if (ret <= -1) { - return MatchCode.DONE; - } else if (ret >= 1) { - // could optimize this, if necessary? - // Could also be called SEEK_TO_CURRENT_ROW, but this - // should be rare/never happens. - return MatchCode.SEEK_NEXT_ROW; - } - } else { - if (ret <= -1) { - return MatchCode.SEEK_NEXT_ROW; - } else if (ret >= 1) { - return MatchCode.DONE; - } - } - } else { - // Since the curCell is null it means we are already sure that we have moved over to the next row - return MatchCode.DONE; - } - - // optimize case. - if (this.stickyNextRow) { - return MatchCode.SEEK_NEXT_ROW; - } - - if (this.columns.done()) { - stickyNextRow = true; - return MatchCode.SEEK_NEXT_ROW; - } - - long timestamp = cell.getTimestamp(); - // check for early out based on timestamp alone - if (columns.isDone(timestamp)) { - return columns.getNextRowOrNextColumn(cell); - } - // check if the cell is expired by cell TTL - if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) { - return MatchCode.SKIP; - } - - /* - * The delete logic is pretty complicated now. - * This is corroborated by the following: - * 1. The store might be instructed to keep deleted rows around. - * 2. A scan can optionally see past a delete marker now. - * 3. If deleted rows are kept, we have to find out when we can - * remove the delete markers. - * 4. Family delete markers are always first (regardless of their TS) - * 5. Delete markers should not be counted as version - * 6. Delete markers affect puts of the *same* TS - * 7. Delete marker need to be version counted together with puts - * they affect - */ - byte typeByte = cell.getTypeByte(); - long mvccVersion = cell.getSequenceId(); - if (CellUtil.isDelete(cell)) { - if (keepDeletedCells == KeepDeletedCells.FALSE - || (keepDeletedCells == KeepDeletedCells.TTL && timestamp < ttl)) { - // first ignore delete markers if the scanner can do so, and the - // range does not include the marker - // - // during flushes and compactions also ignore delete markers newer - // than the readpoint of any open scanner, this prevents deleted - // rows that could still be seen by a scanner from being collected - boolean includeDeleteMarker = seePastDeleteMarkers ? - tr.withinTimeRange(timestamp) : - tr.withinOrAfterTimeRange(timestamp); - if (includeDeleteMarker - && mvccVersion <= maxReadPointToTrackVersions) { - this.deletes.add(cell); - } - // Can't early out now, because DelFam come before any other keys - } - - if ((!isUserScan) - && timeToPurgeDeletes > 0 - && (EnvironmentEdgeManager.currentTime() - timestamp) - <= timeToPurgeDeletes) { - return MatchCode.INCLUDE; - } else if (retainDeletesInOutput || mvccVersion > maxReadPointToTrackVersions) { - // always include or it is not time yet to check whether it is OK - // to purge deltes or not - if (!isUserScan) { - // if this is not a user scan (compaction), we can filter this deletemarker right here - // otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking - return MatchCode.INCLUDE; - } - } else if (keepDeletedCells == KeepDeletedCells.TRUE - || (keepDeletedCells == KeepDeletedCells.TTL && timestamp >= ttl)) { - if (timestamp < earliestPutTs) { - // keeping delete rows, but there are no puts older than - // this delete in the store files. - return columns.getNextRowOrNextColumn(cell); - } - // else: fall through and do version counting on the - // delete markers - } else { - return MatchCode.SKIP; - } - // note the following next else if... - // delete marker are not subject to other delete markers - } else if (!this.deletes.isEmpty()) { - DeleteResult deleteResult = deletes.isDeleted(cell); - switch (deleteResult) { - case FAMILY_DELETED: - case COLUMN_DELETED: - return columns.getNextRowOrNextColumn(cell); - case VERSION_DELETED: - case FAMILY_VERSION_DELETED: - return MatchCode.SKIP; - case NOT_DELETED: - break; - default: - throw new RuntimeException("UNEXPECTED"); - } - } - - // NOTE: Cryptic stuff! - // if the timestamp is HConstants.OLDEST_TIMESTAMP, then this is a fake cell made to prime a - // Scanner; See KeyValueUTil#createLastOnRow. This Cell should never end up returning out of - // here a matchcode of INCLUDE else we will return to the client a fake Cell. If we call - // TimeRange, it will return 0 because it doesn't deal in OLDEST_TIMESTAMP and we will fall - // into the later code where we could return a matchcode of INCLUDE. See HBASE-16074 "ITBLL - // fails, reports lost big or tiny families" for a horror story. Check here for - // OLDEST_TIMESTAMP. TimeRange#compare is about more generic timestamps, between 0L and - // Long.MAX_LONG. It doesn't do OLDEST_TIMESTAMP weird handling. - int timestampComparison = timestamp == HConstants.OLDEST_TIMESTAMP? -1: tr.compare(timestamp); - if (timestampComparison >= 1) { - return MatchCode.SKIP; - } else if (timestampComparison <= -1) { - return columns.getNextRowOrNextColumn(cell); - } - - // STEP 1: Check if the column is part of the requested columns - MatchCode colChecker = columns.checkColumn(cell, typeByte); - if (colChecker == MatchCode.INCLUDE) { - ReturnCode filterResponse = ReturnCode.SKIP; - // STEP 2: Yes, the column is part of the requested columns. Check if filter is present - if (filter != null) { - // STEP 3: Filter the key value and return if it filters out - filterResponse = filter.filterKeyValue(cell); - switch (filterResponse) { - case SKIP: - return MatchCode.SKIP; - case NEXT_COL: - return columns.getNextRowOrNextColumn(cell); - case NEXT_ROW: - stickyNextRow = true; - return MatchCode.SEEK_NEXT_ROW; - case SEEK_NEXT_USING_HINT: - return MatchCode.SEEK_NEXT_USING_HINT; - default: - //It means it is either include or include and seek next - break; - } - } - /* - * STEP 4: Reaching this step means the column is part of the requested columns and either - * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response. - * Now check the number of versions needed. This method call returns SKIP, INCLUDE, - * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL. - * - * FilterResponse ColumnChecker Desired behavior - * INCLUDE SKIP row has already been included, SKIP. - * INCLUDE INCLUDE INCLUDE - * INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP. - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * - * In all the above scenarios, we return the column checker return value except for - * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE) - */ - colChecker = columns.checkVersions(cell, timestamp, typeByte, - mvccVersion > maxReadPointToTrackVersions); - //Optimize with stickyNextRow - boolean seekNextRowFromEssential = filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW && - filter.isFamilyEssential(cell.getFamilyArray()); - if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW || seekNextRowFromEssential) { - stickyNextRow = true; - } - if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) { - if (colChecker != MatchCode.SKIP) { - return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; - } - return MatchCode.SEEK_NEXT_ROW; - } - return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && - colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL - : colChecker; - } - stickyNextRow = (colChecker == MatchCode.SEEK_NEXT_ROW) ? true - : stickyNextRow; - return colChecker; - } - - /** Handle partial-drop-deletes. As we match keys in order, when we have a range from which - * we can drop deletes, we can set retainDeletesInOutput to false for the duration of this - * range only, and maintain consistency. */ - private void checkPartialDropDeleteRange(Cell curCell) { - // If partial-drop-deletes are used, initially, dropDeletesFromRow and dropDeletesToRow - // are both set, and the matcher is set to retain deletes. We assume ordered keys. When - // dropDeletesFromRow is leq current kv, we start dropping deletes and reset - // dropDeletesFromRow; thus the 2nd "if" starts to apply. - if ((dropDeletesFromRow != null) - && (Arrays.equals(dropDeletesFromRow, HConstants.EMPTY_START_ROW) || - (CellComparator.COMPARATOR.compareRows(curCell, dropDeletesFromRow, 0, - dropDeletesFromRow.length) >= 0))) { - retainDeletesInOutput = false; - dropDeletesFromRow = null; - } - // If dropDeletesFromRow is null and dropDeletesToRow is set, we are inside the partial- - // drop-deletes range. When dropDeletesToRow is leq current kv, we stop dropping deletes, - // and reset dropDeletesToRow so that we don't do any more compares. - if ((dropDeletesFromRow == null) - && (dropDeletesToRow != null) - && !Arrays.equals(dropDeletesToRow, HConstants.EMPTY_END_ROW) - && (CellComparator.COMPARATOR - .compareRows(curCell, dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) { - retainDeletesInOutput = true; - dropDeletesToRow = null; - } - } - - /** - * @return Returns false if we know there are no more rows to be scanned (We've reached the - * stopRow or we are scanning on row only because this Scan is for a Get, etc. - */ - public boolean moreRowsMayExistAfter(Cell kv) { - // If a 'get' Scan -- we are doing a Get (every Get is a single-row Scan in implementation) -- - // then we are looking at one row only, the one specified in the Get coordinate..so we know - // for sure that there are no more rows on this Scan - if (this.get) { - return false; - } - // If no stopRow, return that there may be more rows. The tests that follow depend on a - // non-empty, non-default stopRow so this little test below short-circuits out doing the - // following compares. - if (this.stopRow == null || this.stopRow == HConstants.EMPTY_BYTE_ARRAY) { - return true; - } - return this.isReversed? - rowComparator.compareRows(kv, stopRow, 0, stopRow.length) > 0: - Bytes.equals(stopRow, HConstants.EMPTY_END_ROW) || - rowComparator.compareRows(kv, stopRow, 0, stopRow.length) < 0; - } - - /** - * Set the row when there is change in row - * @param curCell - */ - public void setToNewRow(Cell curCell) { - checkPartialDropDeleteRange(curCell); - this.curCell = curCell; - reset(); - } - - public void reset() { - this.deletes.reset(); - this.columns.reset(); - - stickyNextRow = false; - } - - /** - * - * @return the start key - */ - public Cell getStartKey() { - return this.startKey; - } - - /** - * - * @return the Filter - */ - Filter getFilter() { - return this.filter; - } - - public Cell getNextKeyHint(Cell kv) throws IOException { - if (filter == null) { - return null; - } else { - return filter.getNextCellHint(kv); - } - } - - public Cell getKeyForNextColumn(Cell kv) { - ColumnCount nextColumn = columns.getColumnHint(); - if (nextColumn == null) { - return CellUtil.createLastOnRowCol(kv); - } else { - return CellUtil.createFirstOnRowCol(kv, nextColumn.getBuffer(), nextColumn.getOffset(), - nextColumn.getLength()); - } - } - - public Cell getKeyForNextRow(Cell c) { - return CellUtil.createLastOnRow(c); - } - - /** - * @param nextIndexed the key of the next entry in the block index (if any) - * @param kv The Cell we're using to calculate the seek key - * @return result of the compare between the indexed key and the key portion of the passed cell - */ - public int compareKeyForNextRow(Cell nextIndexed, Cell kv) { - return rowComparator.compareKeyBasedOnColHint(nextIndexed, kv, 0, 0, null, 0, 0, - HConstants.OLDEST_TIMESTAMP, Type.Minimum.getCode()); - } - - /** - * @param nextIndexed the key of the next entry in the block index (if any) - * @param currentCell The Cell we're using to calculate the seek key - * @return result of the compare between the indexed key and the key portion of the passed cell - */ - public int compareKeyForNextColumn(Cell nextIndexed, Cell currentCell) { - ColumnCount nextColumn = columns.getColumnHint(); - if (nextColumn == null) { - return rowComparator.compareKeyBasedOnColHint(nextIndexed, currentCell, 0, 0, null, 0, 0, - HConstants.OLDEST_TIMESTAMP, Type.Minimum.getCode()); - } else { - return rowComparator.compareKeyBasedOnColHint(nextIndexed, currentCell, - currentCell.getFamilyOffset(), currentCell.getFamilyLength(), nextColumn.getBuffer(), - nextColumn.getOffset(), nextColumn.getLength(), HConstants.LATEST_TIMESTAMP, - Type.Maximum.getCode()); - } - } - - boolean isUserScan() { - return this.isUserScan; - } - - //Used only for testing purposes - static MatchCode checkColumn(ColumnTracker columnTracker, byte[] bytes, int offset, - int length, long ttl, byte type, boolean ignoreCount) throws IOException { - KeyValue kv = KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY, 0, 0, - HConstants.EMPTY_BYTE_ARRAY, 0, 0, bytes, offset, length); - MatchCode matchCode = columnTracker.checkColumn(kv, type); - if (matchCode == MatchCode.INCLUDE) { - return columnTracker.checkVersions(kv, ttl, type, ignoreCount); - } - return matchCode; - } - - /** - * {@link #match} return codes. These instruct the scanner moving through - * memstores and StoreFiles what to do with the current KeyValue. - *

- * Additionally, this contains "early-out" language to tell the scanner to - * move on to the next File (memstore or Storefile), or to return immediately. - */ - public static enum MatchCode { - /** - * Include KeyValue in the returned result - */ - INCLUDE, - - /** - * Do not include KeyValue in the returned result - */ - SKIP, - - /** - * Do not include, jump to next StoreFile or memstore (in time order) - */ - NEXT, - - /** - * Do not include, return current result - */ - DONE, - - /** - * These codes are used by the ScanQueryMatcher - */ - - /** - * Done with the row, seek there. - */ - SEEK_NEXT_ROW, - /** - * Done with column, seek to next. - */ - SEEK_NEXT_COL, - - /** - * Done with scan, thanks to the row filter. - */ - DONE_SCAN, - - /* - * Seek to next key which is given as hint. - */ - SEEK_NEXT_USING_HINT, - - /** - * Include KeyValue and done with column, seek to next. - */ - INCLUDE_AND_SEEK_NEXT_COL, - - /** - * Include KeyValue and done with row, seek to next. - */ - INCLUDE_AND_SEEK_NEXT_ROW, - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 5dec59a4a3a..853a4cf283f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index 15822376380..dca7388119e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -18,6 +18,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.DataInput; +import java.io.IOException; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,12 +49,6 @@ import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.Bytes; -import java.io.DataInput; -import java.io.IOException; -import java.util.Map; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicInteger; - /** * Reader for a StoreFile. */ @@ -113,47 +113,23 @@ public class StoreFileReader { return reader.getComparator(); } - /** - * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting - * {@code isCompaction} to false, {@code readPt} to 0 and {@code scannerOrder} to 0. - * Do not use this overload if using this scanner for compactions. - * - * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) - */ - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { - // 0 is passed as readpoint because this method is only used by test - // where StoreFile is directly operated upon - return getStoreFileScanner(cacheBlocks, pread, false, 0, 0); - } - - /** - * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting - * {@code scannerOrder} to 0. - * - * @see #getStoreFileScanner(boolean, boolean, boolean, long, long) - */ - public StoreFileScanner getStoreFileScanner( - boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) { - return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0); - } - /** * Get a scanner to scan over this StoreFile. - * * @param cacheBlocks should this scanner cache blocks? * @param pread use pread (for highly concurrent small readers) * @param isCompaction is scanner being used for compaction? * @param scannerOrder Order of this scanner relative to other scanners. See - * {@link KeyValueScanner#getScannerOrder()}. + * {@link KeyValueScanner#getScannerOrder()}. + * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column, + * otherwise {@code false}. This is a hint for optimization. * @return a scanner */ - public StoreFileScanner getStoreFileScanner( - boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, long scannerOrder) { + public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, + boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { // Increment the ref count refCount.incrementAndGet(); - return new StoreFileScanner( - this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(), - readPt, scannerOrder); + return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, + reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 4955ffea630..ac55d8cb303 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.util.Counter; /** @@ -54,49 +55,41 @@ public class StoreFileScanner implements KeyValueScanner { private boolean delayedReseek; private Cell delayedSeekKV; - private boolean enforceMVCC = false; - private boolean hasMVCCInfo = false; + private final boolean enforceMVCC; + private final boolean hasMVCCInfo; // A flag represents whether could stop skipping KeyValues for MVCC // if have encountered the next row. Only used for reversed scan private boolean stopSkippingKVsIfNextRow = false; private static Counter seekCount; - private ScanQueryMatcher matcher; + private final boolean canOptimizeForNonNullColumn; - private long readPt; + private final long readPt; // Order of this scanner relative to other scanners when duplicate key-value is found. // Higher values means scanner has newer data. - private long scannerOrder; + private final long scannerOrder; /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}. * @param readPt MVCC value to use to filter out the updates newer than this scanner. * @param hasMVCC Set to true if underlying store file reader has MVCC info. + * @param scannerOrder Order of the scanner relative to other scanners. See + * {@link KeyValueScanner#getScannerOrder()}. + * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column, + * otherwise {@code false}. This is a hint for optimization. */ public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC, - boolean hasMVCC, long readPt) { - this (reader, hfs, useMVCC, hasMVCC, readPt, 0); - } - - /** - * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} - * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}. - * @param readPt MVCC value to use to filter out the updates newer than this scanner. - * @param hasMVCC Set to true if underlying store file reader has MVCC info. - * @param scannerOrder Order of the scanner relative to other scanners. - * See {@link KeyValueScanner#getScannerOrder()}. - */ - public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC, - boolean hasMVCC, long readPt, long scannerOrder) { + boolean hasMVCC, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { this.readPt = readPt; this.reader = reader; this.hfs = hfs; this.enforceMVCC = useMVCC; this.hasMVCCInfo = hasMVCC; this.scannerOrder = scannerOrder; + this.canOptimizeForNonNullColumn = canOptimizeForNonNullColumn; } boolean isPrimaryReplica() { @@ -126,24 +119,20 @@ public class StoreFileScanner implements KeyValueScanner { } /** - * Return an array of scanners corresponding to the given set of store files, - * And set the ScanQueryMatcher for each store file scanner for further - * optimization + * Return an array of scanners corresponding to the given set of store files, And set the + * ScanQueryMatcher for each store file scanner for further optimization */ - public static List getScannersForStoreFiles( - Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, boolean canUseDrop, + public static List getScannersForStoreFiles(Collection files, + boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException { - List scanners = new ArrayList( - files.size()); + List scanners = new ArrayList(files.size()); List sorted_files = new ArrayList<>(files); Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID); for (int i = 0; i < sorted_files.size(); i++) { StoreFileReader r = sorted_files.get(i).createReader(); r.setReplicaStoreFile(isPrimaryReplica); - StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, - isCompaction, readPt, i); - scanner.setScanQueryMatcher(matcher); + StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt, + i, matcher != null ? !matcher.hasNullColumnInQuery() : false); scanners.add(scanner); } return scanners; @@ -360,12 +349,12 @@ public class StoreFileScanner implements KeyValueScanner { // check ROWCOL Bloom filter first. if (reader.getBloomFilterType() == BloomType.ROWCOL) { haveToSeek = reader.passesGeneralRowColBloomFilter(kv); - } else if (this.matcher != null && !matcher.hasNullColumnInQuery() && - ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) { + } else if (canOptimizeForNonNullColumn + && ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) { // if there is no such delete family kv in the store file, // then no need to seek. - haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), - kv.getRowOffset(), kv.getRowLength()); + haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), kv.getRowOffset(), + kv.getRowLength()); } } @@ -434,10 +423,6 @@ public class StoreFileScanner implements KeyValueScanner { } } - public void setScanQueryMatcher(ScanQueryMatcher matcher) { - this.matcher = matcher; - } - @Override public boolean isFileScanner() { return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 080bb958c1e..91a77eaee58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -42,14 +44,16 @@ import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; +import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; +import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.common.annotations.VisibleForTesting; - /** * Scanner scans both the memstore and the Store. Coalesce KeyValue stream * into List<KeyValue> for a single row. @@ -180,6 +184,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected void addCurrentScanners(List scanners) { this.currentScanners.addAll(scanners); } + /** * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we * are not in a compaction. @@ -196,9 +201,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); } - matcher = new ScanQueryMatcher(scan, scanInfo, columns, - ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, - oldestUnexpiredTS, now, store.getCoprocessorHost()); + matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, + store.getCoprocessorHost()); this.store.addChangedReaderObserver(this); @@ -267,13 +271,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { this(store, scan, scanInfo, null, - ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false); - if (dropDeletesFromRow == null) { - matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, - earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost()); + ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false); + if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0) + || (scan.getStopRow() != null && scan.getStopRow().length > 0) + || !scan.getTimeRange().isAllTime()) { + // use legacy query matcher since we do not consider the scan object in our code. Only used to + // keep compatibility for coprocessor. + matcher = LegacyScanQueryMatcher.create(scan, scanInfo, null, scanType, smallestReadPoint, + earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, + store.getCoprocessorHost()); } else { - matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs, - oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); + matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, + earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, + store.getCoprocessorHost()); } // Filter the list of scanners using Bloom filters, time range, TTL, etc. @@ -306,18 +316,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner 0); } - public StoreScanner(final Scan scan, ScanInfo scanInfo, - ScanType scanType, final NavigableSet columns, - final List scanners, long earliestPutTs, long readPt) - throws IOException { + public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, + final NavigableSet columns, final List scanners, long earliestPutTs, + long readPt) throws IOException { this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks()); - this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, - Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null); - - // In unit tests, the store could be null - if (this.store != null) { - this.store.addChangedReaderObserver(this); + if (scanType == ScanType.USER_SCAN) { + this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, + null); + } else { + if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0) + || (scan.getStopRow() != null && scan.getStopRow().length > 0) + || !scan.getTimeRange().isAllTime() || columns != null) { + // use legacy query matcher since we do not consider the scan object in our code. Only used + // to keep compatibility for coprocessor. + matcher = LegacyScanQueryMatcher.create(scan, scanInfo, columns, scanType, Long.MAX_VALUE, + earliestPutTs, oldestUnexpiredTS, now, null, null, store.getCoprocessorHost()); + } else { + this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, + earliestPutTs, oldestUnexpiredTS, now, null, null, null); + } } + // Seek all scanners to the initial key seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); addCurrentScanners(scanners); @@ -515,7 +534,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing // rows. Else it is possible we are still traversing the same row so we must perform the row // comparison. - if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null) { + if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { this.countPerRow = 0; matcher.setToNewRow(cell); } @@ -544,111 +563,111 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner ScanQueryMatcher.MatchCode qcode = matcher.match(cell); qcode = optimize(qcode, cell); switch (qcode) { - case INCLUDE: - case INCLUDE_AND_SEEK_NEXT_ROW: - case INCLUDE_AND_SEEK_NEXT_COL: + case INCLUDE: + case INCLUDE_AND_SEEK_NEXT_ROW: + case INCLUDE_AND_SEEK_NEXT_COL: - Filter f = matcher.getFilter(); - if (f != null) { - cell = f.transformCell(cell); - } + Filter f = matcher.getFilter(); + if (f != null) { + cell = f.transformCell(cell); + } - this.countPerRow++; - if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { - // do what SEEK_NEXT_ROW does. + this.countPerRow++; + if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { + // do what SEEK_NEXT_ROW does. + if (!matcher.moreRowsMayExistAfter(cell)) { + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + matcher.clearCurrentRow(); + seekToNextRow(cell); + break LOOP; + } + + // add to results only if we have skipped #storeOffset kvs + // also update metric accordingly + if (this.countPerRow > storeOffset) { + outResult.add(cell); + + // Update local tracking information + count++; + totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell); + + // Update the progress of the scanner context + scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); + scannerContext.incrementBatchProgress(1); + + if (matcher.isUserScan() && totalBytesRead > maxRowSize) { + throw new RowTooBigException( + "Max row size allowed: " + maxRowSize + ", but the row is bigger than that."); + } + } + + if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + if (!matcher.moreRowsMayExistAfter(cell)) { + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + matcher.clearCurrentRow(); + seekToNextRow(cell); + } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + seekAsDirection(matcher.getKeyForNextColumn(cell)); + } else { + this.heap.next(); + } + + if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { + break LOOP; + } + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { + break LOOP; + } + continue; + + case DONE: + // Optimization for Gets! If DONE, no more to get on this row, early exit! + if (this.scan.isGetScan()) { + // Then no more to this row... exit. + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + matcher.clearCurrentRow(); + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + + case DONE_SCAN: + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + + case SEEK_NEXT_ROW: + // This is just a relatively simple end of scan fix, to short-cut end + // us if there is an endKey in the scan. if (!matcher.moreRowsMayExistAfter(cell)) { close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - matcher.curCell = null; + matcher.clearCurrentRow(); seekToNextRow(cell); - break LOOP; - } + break; - // add to results only if we have skipped #storeOffset kvs - // also update metric accordingly - if (this.countPerRow > storeOffset) { - outResult.add(cell); - - // Update local tracking information - count++; - totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell); - - // Update the progress of the scanner context - scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); - scannerContext.incrementBatchProgress(1); - - if (matcher.isUserScan() && totalBytesRead > maxRowSize) { - throw new RowTooBigException( - "Max row size allowed: " + maxRowSize + ", but the row is bigger than that."); - } - } - - if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { - if (!matcher.moreRowsMayExistAfter(cell)) { - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - matcher.curCell = null; - seekToNextRow(cell); - } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + case SEEK_NEXT_COL: seekAsDirection(matcher.getKeyForNextColumn(cell)); - } else { + break; + + case SKIP: this.heap.next(); - } + break; - if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { - break LOOP; - } - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { - break LOOP; - } - continue; + case SEEK_NEXT_USING_HINT: + Cell nextKV = matcher.getNextKeyHint(cell); + if (nextKV != null) { + seekAsDirection(nextKV); + } else { + heap.next(); + } + break; - case DONE: - // Optimization for Gets! If DONE, no more to get on this row, early exit! - if (this.scan.isGetScan()) { - // Then no more to this row... exit. - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - matcher.curCell = null; - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - - case DONE_SCAN: - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - - case SEEK_NEXT_ROW: - // This is just a relatively simple end of scan fix, to short-cut end - // us if there is an endKey in the scan. - if (!matcher.moreRowsMayExistAfter(cell)) { - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - matcher.curCell = null; - seekToNextRow(cell); - break; - - case SEEK_NEXT_COL: - seekAsDirection(matcher.getKeyForNextColumn(cell)); - break; - - case SKIP: - this.heap.next(); - break; - - case SEEK_NEXT_USING_HINT: - Cell nextKV = matcher.getNextKeyHint(cell); - if (nextKV != null) { - seekAsDirection(nextKV); - } else { - heap.next(); - } - break; - - default: - throw new RuntimeException("UNEXPECTED"); + default: + throw new RuntimeException("UNEXPECTED"); } } while ((cell = this.heap.peek()) != null); @@ -822,7 +841,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (cell == null) { cell = lastTopKey; } - if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) { + if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { this.countPerRow = 0; // The setToNewRow will call reset internally matcher.setToNewRow(cell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java similarity index 84% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java index 71ea1bd6c7e..74b908492c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java @@ -16,18 +16,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver; +package org.apache.hadoop.hbase.regionserver.querymatcher; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** - * Simple wrapper for a byte buffer and a counter. Does not copy. + * Simple wrapper for a byte buffer and a counter. Does not copy. *

* NOT thread-safe because it is not used in a multi-threaded context, yet. */ @InterfaceAudience.Private -public class ColumnCount { - private final byte [] bytes; +class ColumnCount { + private final byte[] bytes; private final int offset; private final int length; private int count; @@ -36,7 +36,7 @@ public class ColumnCount { * Constructor * @param column the qualifier to count the versions for */ - public ColumnCount(byte [] column) { + public ColumnCount(byte[] column) { this(column, 0); } @@ -45,7 +45,7 @@ public class ColumnCount { * @param column the qualifier to count the versions for * @param count initial count */ - public ColumnCount(byte [] column, int count) { + public ColumnCount(byte[] column, int count) { this(column, 0, column.length, count); } @@ -56,7 +56,7 @@ public class ColumnCount { * @param length of the qualifier * @param count initial count */ - public ColumnCount(byte [] column, int offset, int length, int count) { + public ColumnCount(byte[] column, int offset, int length, int count) { this.bytes = column; this.offset = offset; this.length = length; @@ -66,21 +66,21 @@ public class ColumnCount { /** * @return the buffer */ - public byte [] getBuffer(){ + public byte[] getBuffer() { return this.bytes; } /** * @return the offset */ - public int getOffset(){ + public int getOffset() { return this.offset; } /** * @return the length */ - public int getLength(){ + public int getLength() { return this.length; } @@ -107,5 +107,4 @@ public class ColumnCount { public void setCount(int count) { this.count = count; } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java similarity index 95% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java index d352561a291..17c6afe0d89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java @@ -16,13 +16,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver; +package org.apache.hadoop.hbase.regionserver.querymatcher; import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; /** * Implementing classes of this interface will be used for the tracking @@ -43,8 +43,8 @@ import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; * believes that the current column should be skipped (by timestamp, filter etc.) * *

- * These two methods returns a - * {@link org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode} + * These two methods returns a + * {@link org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode} * to define what action should be taken. *

* This class is NOT thread-safe as queries are never multi-threaded diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java new file mode 100644 index 00000000000..d3224dceab1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.querymatcher; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; + +/** + * Query matcher for compaction. + */ +@InterfaceAudience.Private +public abstract class CompactionScanQueryMatcher extends ScanQueryMatcher { + + /** readPoint over which the KVs are unconditionally included */ + protected final long maxReadPointToTrackVersions; + + /** Keeps track of deletes */ + protected final DeleteTracker deletes; + + /** whether to return deleted rows */ + protected final KeepDeletedCells keepDeletedCells; + + protected CompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes, + long readPointToUse, long oldestUnexpiredTS, long now) { + super(HConstants.EMPTY_START_ROW, scanInfo, + new ScanWildcardColumnTracker(scanInfo.getMinVersions(), scanInfo.getMaxVersions(), + oldestUnexpiredTS), + oldestUnexpiredTS, now); + this.maxReadPointToTrackVersions = readPointToUse; + this.deletes = deletes; + this.keepDeletedCells = scanInfo.getKeepDeletedCells(); + } + + @Override + public boolean hasNullColumnInQuery() { + return true; + } + + @Override + public boolean isUserScan() { + return false; + } + + @Override + public boolean moreRowsMayExistAfter(Cell cell) { + return true; + } + + @Override + public Filter getFilter() { + // no filter when compaction + return null; + } + + @Override + public Cell getNextKeyHint(Cell cell) throws IOException { + // no filter, so no key hint. + return null; + } + + @Override + protected void reset() { + deletes.reset(); + } + + protected final void trackDelete(Cell cell) { + // If keepDeletedCells is true, then we only remove cells by versions or TTL during + // compaction, so we do not need to track delete here. + // If keepDeletedCells is TTL and the delete marker is expired, then we can make sure that the + // minVerions is larger than 0(otherwise we will just return at preCheck). So here we still + // need to track the delete marker to see if it masks some cells. + if (keepDeletedCells == KeepDeletedCells.FALSE + || (keepDeletedCells == KeepDeletedCells.TTL && cell.getTimestamp() < oldestUnexpiredTS)) { + deletes.add(cell); + } + } + + public static CompactionScanQueryMatcher create(ScanInfo scanInfo, ScanType scanType, + long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, + byte[] dropDeletesFromRow, byte[] dropDeletesToRow, + RegionCoprocessorHost regionCoprocessorHost) throws IOException { + DeleteTracker deleteTracker = instantiateDeleteTracker(regionCoprocessorHost); + if (dropDeletesFromRow == null) { + if (scanType == ScanType.COMPACT_RETAIN_DELETES) { + return new MinorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse, + oldestUnexpiredTS, now); + } else { + return new MajorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse, + earliestPutTs, oldestUnexpiredTS, now); + } + } else { + return new StripeCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse, + earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java similarity index 81% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeleteTracker.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java index 8f466fc720f..4e1ba4ec2e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeleteTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver; +package org.apache.hadoop.hbase.regionserver.querymatcher; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; /** - * This interface is used for the tracking and enforcement of Deletes - * during the course of a Get or Scan operation. + * This interface is used for the tracking and enforcement of Deletes during the course of a Get or + * Scan operation. *

* This class is utilized through three methods: - *