HBASE-16324 Remove LegacyScanQueryMatcher

This commit is contained in:
zhangduo 2017-08-25 17:02:03 +08:00
parent b55b952d5c
commit 95bc464527
21 changed files with 552 additions and 1045 deletions

View File

@ -19,9 +19,9 @@
package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.OptionalInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -194,9 +194,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
// take default action
return null;
}
Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, scanners,
return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
@ -210,9 +208,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
// take default action
return null;
}
Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, scanners, scanType,
return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners, scanType,
store.getSmallestReadPoint(), earliestPutTs);
}

View File

@ -22,6 +22,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.OptionalInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -32,7 +33,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
@ -74,9 +74,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners,
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions());
return new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType,
smallestReadPoint, fd.earliestPutTs);
}
};

View File

@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.crypto.Encryption;
@ -805,15 +804,12 @@ public class PartitionedMobCompactor extends MobCompactor {
* @throws IOException if IO failure is encountered
*/
private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
throws IOException {
throws IOException {
List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact,
false, true, false, false, HConstants.LATEST_TIMESTAMP);
Scan scan = new Scan();
scan.setMaxVersions(column.getMaxVersions());
long ttl = HStore.determineTTLFromFamily(column);
ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, CellComparator.COMPARATOR);
return new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
HConstants.LATEST_TIMESTAMP);
return new StoreScanner(scanInfo, scanType, scanners);
}
/**

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;

View File

@ -19,16 +19,16 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalInt;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
@ -106,23 +106,15 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
/**
* Creates the scanner for compacting the pipeline.
*
* @return the scanner
*/
private StoreScanner createScanner(Store store, List<KeyValueScanner> scanners)
throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(); //Get all available versions
StoreScanner internalScanner =
new StoreScanner(store, store.getScanInfo(), scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
return internalScanner;
// Get all available versions
return new StoreScanner(store, store.getScanInfo(), OptionalInt.of(Integer.MAX_VALUE), scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
/* Refill kev-value set (should be invoked only when KVS is empty)
* Returns true if KVS is non-empty */
private boolean refillKVS() {

View File

@ -53,11 +53,9 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
}
/** Constructor for testing. */
ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners)
throws IOException {
super(scan, scanInfo, scanType, columns, scanners,
HConstants.LATEST_TIMESTAMP);
ReversedStoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
List<? extends KeyValueScanner> scanners) throws IOException {
super(scan, scanInfo, columns, scanners);
}
@Override

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -86,11 +85,8 @@ abstract class StoreFlusher {
smallestReadPoint);
}
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(store.getScanInfo().getMaxVersions());
scanner = new StoreScanner(store, store.getScanInfo(), scan,
snapshotScanners, ScanType.COMPACT_RETAIN_DELETES,
smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
scanner = new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners,
ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
assert scanner != null;
if (store.getCoprocessorHost() != null) {

View File

@ -24,6 +24,8 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
@ -45,7 +47,6 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
import org.apache.hadoop.hbase.util.CollectionUtils;
@ -66,7 +67,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
private static final Log LOG = LogFactory.getLog(StoreScanner.class);
// In unit tests, the store could be null
protected final Store store;
protected final Optional<Store> store;
private ScanQueryMatcher matcher;
protected KeyValueHeap heap;
private boolean cacheBlocks;
@ -166,14 +167,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
/** An internal constructor. */
protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks, ScanType scanType) {
private StoreScanner(Optional<Store> store, Scan scan, ScanInfo scanInfo,
int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) {
this.readPt = readPt;
this.store = store;
this.cacheBlocks = cacheBlocks;
get = scan.isGetScan();
int numCol = columns == null ? 0 : columns.size();
explicitColumnQuery = numCol > 0;
explicitColumnQuery = numColumns > 0;
this.scan = scan;
this.now = EnvironmentEdgeManager.currentTime();
this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
@ -183,13 +183,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// the seek operation. However, we also look the row-column Bloom filter
// for multi-row (non-"get") scans because this is not done in
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1);
this.maxRowSize = scanInfo.getTableMaxRowSize();
if (get) {
this.readType = Scan.ReadType.PREAD;
this.scanUsePread = true;
} else if(scanType != scanType.USER_SCAN) {
} else if(scanType != ScanType.USER_SCAN) {
// For compaction scanners never use Pread as already we have stream based scanners on the
// store files to be compacted
this.readType = Scan.ReadType.STREAM;
@ -207,13 +206,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.preadMaxBytes = scanInfo.getPreadMaxBytes();
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
// Parallel seeking is on if the config allows and more there is more than one store file.
if (this.store != null && this.store.getStorefilesCount() > 1) {
RegionServerServices rsService = ((HStore) store).getHRegion().getRegionServerServices();
if (rsService != null && scanInfo.isParallelSeekEnabled()) {
this.parallelSeekEnabled = true;
this.executor = rsService.getExecutorService();
this.store.ifPresent(s -> {
if (s.getStorefilesCount() > 1) {
RegionServerServices rsService = ((HStore) s).getHRegion().getRegionServerServices();
if (rsService != null && scanInfo.isParallelSeekEnabled()) {
this.parallelSeekEnabled = true;
this.executor = rsService.getExecutorService();
}
}
}
});
}
private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
@ -229,21 +230,23 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @param columns which columns we are scanning
* @throws IOException
*/
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
long readPt)
throws IOException {
this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks(), ScanType.USER_SCAN);
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
this(Optional.of(store), scan, scanInfo, columns != null ? columns.size() : 0, readPt,
scan.getCacheBlocks(), ScanType.USER_SCAN);
if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
}
matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
store.getCoprocessorHost());
this.store.addChangedReaderObserver(this);
store.addChangedReaderObserver(this);
try {
// Pass columns to try to filter out unnecessary StoreFiles.
List<KeyValueScanner> scanners = getScannersNoCompaction();
List<KeyValueScanner> scanners = selectScannersFrom(store,
store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
// Seek all scanners to the start of the Row (or if the exact matching row
// key does not exist, then to the start of the next matching Row).
@ -263,66 +266,61 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
} catch (IOException e) {
// remove us from the HStore#changedReaderObservers here or we'll have no chance to
// and might cause memory leak
this.store.deleteChangedReaderObserver(this);
store.deleteChangedReaderObserver(this);
throw e;
}
}
// a dummy scan instance for compaction.
private static final Scan SCAN_FOR_COMPACTION = new Scan();
/**
* Used for compactions.<p>
*
* Used for compactions.
* <p>
* Opens a scanner across specified StoreFiles.
* @param store who we scan
* @param scan the spec
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking
* versions
* @param smallestReadPoint the readPoint that we should use for tracking versions
*/
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType,
long smallestReadPoint, long earliestPutTs) throws IOException {
this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
public StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs) throws IOException {
this(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs, null,
null);
}
/**
* Used for compactions that drop deletes from a limited range of rows.<p>
*
* Used for compactions that drop deletes from a limited range of rows.
* <p>
* Opens a scanner across specified StoreFiles.
* @param store who we scan
* @param scan the spec
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking versions
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
*/
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
public StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions,
List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
this(store, scanInfo, maxVersions, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
private StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
this(store, scan, scanInfo, null,
((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0)
|| (scan.getStopRow() != null && scan.getStopRow().length > 0)
|| !scan.getTimeRange().isAllTime()) {
// use legacy query matcher since we do not consider the scan object in our code. Only used to
// keep compatibility for coprocessor.
matcher = LegacyScanQueryMatcher.create(scan, scanInfo, null, scanType, smallestReadPoint,
earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow,
store.getCoprocessorHost());
} else {
matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint,
earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow,
store.getCoprocessorHost());
}
this(Optional.of(store),
maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
: SCAN_FOR_COMPACTION,
scanInfo, 0, ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED),
false, scanType);
assert scanType != ScanType.USER_SCAN;
matcher =
CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
scanners = selectScannersFrom(scanners);
scanners = selectScannersFrom(store, scanners);
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
@ -331,62 +329,46 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
resetKVHeap(scanners, store.getComparator());
}
@VisibleForTesting
StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<? extends KeyValueScanner> scanners) throws IOException {
this(scan, scanInfo, scanType, columns, scanners,
HConstants.LATEST_TIMESTAMP,
// 0 is passed as readpoint because the test bypasses Store
0);
}
@VisibleForTesting
StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<? extends KeyValueScanner> scanners, long earliestPutTs)
throws IOException {
this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
// 0 is passed as readpoint because the test bypasses Store
0);
}
public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs,
long readPt) throws IOException {
this(null, scan, scanInfo, columns, readPt,
scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false, scanType);
if (scanType == ScanType.USER_SCAN) {
this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
null);
} else {
if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0)
|| (scan.getStopRow() != null && scan.getStopRow().length > 0)
|| !scan.getTimeRange().isAllTime() || columns != null) {
// use legacy query matcher since we do not consider the scan object in our code. Only used
// to keep compatibility for coprocessor.
matcher = LegacyScanQueryMatcher.create(scan, scanInfo, columns, scanType, Long.MAX_VALUE,
earliestPutTs, oldestUnexpiredTS, now, null, null, store.getCoprocessorHost());
} else {
this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
earliestPutTs, oldestUnexpiredTS, now, null, null, null);
}
}
private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
throws IOException {
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
addCurrentScanners(scanners);
resetKVHeap(scanners, scanInfo.getComparator());
}
/**
* Get a filtered list of scanners. Assumes we are not in a compaction.
* @return list of scanners to seek
*/
private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
return selectScannersFrom(
store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
// For mob compaction only as we do not have a Store instance when doing mob compaction.
public StoreScanner(ScanInfo scanInfo, ScanType scanType,
List<? extends KeyValueScanner> scanners) throws IOException {
this(Optional.empty(), SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
assert scanType != ScanType.USER_SCAN;
this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
oldestUnexpiredTS, now, null, null, null);
seekAllScanner(scanInfo, scanners);
}
// Used to instantiate a scanner for user scan in test
@VisibleForTesting
StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
List<? extends KeyValueScanner> scanners) throws IOException {
// 0 is passed as readpoint because the test bypasses Store
this(Optional.empty(), scan, scanInfo, columns != null ? columns.size() : 0, 0L,
scan.getCacheBlocks(), ScanType.USER_SCAN);
this.matcher =
UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
seekAllScanner(scanInfo, scanners);
}
// Used to instantiate a scanner for compaction in test
@VisibleForTesting
StoreScanner(ScanInfo scanInfo, OptionalInt maxVersions, ScanType scanType,
List<? extends KeyValueScanner> scanners) throws IOException {
// 0 is passed as readpoint because the test bypasses Store
this(Optional.empty(), maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
: SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType);
this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
seekAllScanner(scanInfo, scanners);
}
@VisibleForTesting
@ -439,18 +421,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
/**
* Filters the given list of scanners using Bloom filter, time range, and
* TTL.
* Filters the given list of scanners using Bloom filter, time range, and TTL.
* <p>
* Will be overridden by testcase so declared as protected.
*/
@VisibleForTesting
protected List<KeyValueScanner> selectScannersFrom(
final List<? extends KeyValueScanner> allScanners) {
protected List<KeyValueScanner> selectScannersFrom(Store store,
List<? extends KeyValueScanner> allScanners) {
boolean memOnly;
boolean filesOnly;
if (scan instanceof InternalScan) {
InternalScan iscan = (InternalScan)scan;
InternalScan iscan = (InternalScan) scan;
memOnly = iscan.isCheckOnlyMemStore();
filesOnly = iscan.isCheckOnlyStoreFiles();
} else {
@ -462,7 +443,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// We can only exclude store files based on TTL if minVersions is set to 0.
// Otherwise, we might have to return KVs that have technically expired.
long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS: Long.MIN_VALUE;
long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
// include only those scan files which pass all filters
for (KeyValueScanner kvs : allScanners) {
@ -503,10 +484,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (withDelayedScannersClose) {
this.closing = true;
}
// Under test, we dont have a this.store
if (this.store != null) {
this.store.deleteChangedReaderObserver(this);
}
// For mob compaction, we do not have a store.
this.store.ifPresent(s -> s.deleteChangedReaderObserver(this));
if (withDelayedScannersClose) {
clearAndClose(scannersForDelayedClose);
clearAndClose(memStoreScannersAfterFlush);
@ -583,7 +562,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Only do a sanity-check if store and comparator are available.
CellComparator comparator = store != null ? store.getComparator() : null;
CellComparator comparator = store.map(s -> s.getComparator()).orElse(null);
int count = 0;
long totalBytesRead = 0;
@ -895,6 +874,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @return if top of heap has changed (and KeyValueHeap has to try the next KV)
*/
protected final boolean reopenAfterFlush() throws IOException {
// here we can make sure that we have a Store instance.
Store store = this.store.get();
Cell lastTop = heap.peek();
// When we have the scan object, should we not pass it to getScanners() to get a limited set of
// scanners? We did so in the constructor and we could have done it now by storing the scan
@ -906,7 +887,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get,
scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false));
allScanners.addAll(memStoreScannersAfterFlush);
scanners = selectScannersFrom(allScanners);
scanners = selectScannersFrom(store, allScanners);
// Clear the current set of flushed store files so that they don't get added again
flushedStoreFiles.clear();
memStoreScannersAfterFlush.clear();
@ -998,8 +979,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@VisibleForTesting
void trySwitchToStreamRead() {
if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||
bytesRead < preadMaxBytes) {
if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing ||
heap.peek() == null || bytesRead < preadMaxBytes) {
return;
}
if (LOG.isDebugEnabled()) {
@ -1021,6 +1002,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
List<KeyValueScanner> fileScanners = null;
List<KeyValueScanner> newCurrentScanners;
KeyValueHeap newHeap;
// We must have a store instance here
Store store = this.store.get();
try {
// recreate the scanners on the current file scanners
fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
@ -499,10 +499,8 @@ public abstract class Compactor<T extends CellSink> {
*/
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions());
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs);
return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType,
smallestReadPoint, earliestPutTs);
}
/**
@ -515,11 +513,9 @@ public abstract class Compactor<T extends CellSink> {
* @return A compaction scanner.
*/
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions());
return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners,
smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
}

View File

@ -1,384 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.querymatcher;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
/**
* The old query matcher implementation. Used to keep compatibility for coprocessor that could
* overwrite the StoreScanner before compaction. Should be removed once we find a better way to do
* filtering during compaction.
*/
@Deprecated
@InterfaceAudience.Private
public class LegacyScanQueryMatcher extends ScanQueryMatcher {
private final TimeRange tr;
private final Filter filter;
/** Keeps track of deletes */
private final DeleteTracker deletes;
/**
* The following three booleans define how we deal with deletes. There are three different
* aspects:
* <ol>
* <li>Whether to keep delete markers. This is used in compactions. Minor compactions always keep
* delete markers.</li>
* <li>Whether to keep deleted rows. This is also used in compactions, if the store is set to keep
* deleted rows. This implies keeping the delete markers as well.</li> In this case deleted rows
* are subject to the normal max version and TTL/min version rules just like "normal" rows.
* <li>Whether a scan can do time travel queries even before deleted marker to reach deleted
* rows.</li>
* </ol>
*/
/** whether to retain delete markers */
private boolean retainDeletesInOutput;
/** whether to return deleted rows */
private final KeepDeletedCells keepDeletedCells;
// By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete
// marker is always removed during a major compaction. If set to non-zero
// value then major compaction will try to keep a delete marker around for
// the given number of milliseconds. We want to keep the delete markers
// around a bit longer because old puts might appear out-of-order. For
// example, during log replication between two clusters.
//
// If the delete marker has lived longer than its column-family's TTL then
// the delete marker will be removed even if time.to.purge.deletes has not
// passed. This is because all the Puts that this delete marker can influence
// would have also expired. (Removing of delete markers on col family TTL will
// not happen if min-versions is set to non-zero)
//
// But, if time.to.purge.deletes has not expired then a delete
// marker will not be removed just because there are no Puts that it is
// currently influencing. This is because Puts, that this delete can
// influence. may appear out of order.
private final long timeToPurgeDeletes;
/**
* This variable shows whether there is an null column in the query. There always exists a null
* column in the wildcard column query. There maybe exists a null column in the explicit column
* query based on the first column.
*/
private final boolean hasNullColumn;
/** readPoint over which the KVs are unconditionally included */
private final long maxReadPointToTrackVersions;
/**
* Oldest put in any of the involved store files Used to decide whether it is ok to delete family
* delete marker of this store keeps deleted KVs.
*/
protected final long earliestPutTs;
private final byte[] stopRow;
private byte[] dropDeletesFromRow = null, dropDeletesToRow = null;
private LegacyScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns,
boolean hasNullColumn, DeleteTracker deletes, ScanType scanType, long readPointToUse,
long earliestPutTs, long oldestUnexpiredTS, long now) {
super(createStartKeyFromRow(scan.getStartRow(), scanInfo), scanInfo, columns, oldestUnexpiredTS,
now);
TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily());
if (timeRange == null) {
this.tr = scan.getTimeRange();
} else {
this.tr = timeRange;
}
this.hasNullColumn = hasNullColumn;
this.deletes = deletes;
this.filter = scan.getFilter();
this.maxReadPointToTrackVersions = readPointToUse;
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
this.earliestPutTs = earliestPutTs;
/* how to deal with deletes */
this.keepDeletedCells = scanInfo.getKeepDeletedCells();
this.retainDeletesInOutput = scanType == ScanType.COMPACT_RETAIN_DELETES;
this.stopRow = scan.getStopRow();
}
private LegacyScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns,
boolean hasNullColumn, DeleteTracker deletes, ScanType scanType, long readPointToUse,
long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) {
this(scan, scanInfo, columns, hasNullColumn, deletes, scanType, readPointToUse, earliestPutTs,
oldestUnexpiredTS, now);
this.dropDeletesFromRow = Preconditions.checkNotNull(dropDeletesFromRow);
this.dropDeletesToRow = Preconditions.checkNotNull(dropDeletesToRow);
}
@Override
public void beforeShipped() throws IOException {
super.beforeShipped();
deletes.beforeShipped();
}
@Override
public MatchCode match(Cell cell) throws IOException {
if (filter != null && filter.filterAllRemaining()) {
return MatchCode.DONE_SCAN;
}
MatchCode returnCode = preCheck(cell);
if (returnCode != null) {
return returnCode;
}
/*
* The delete logic is pretty complicated now.
* This is corroborated by the following:
* 1. The store might be instructed to keep deleted rows around.
* 2. A scan can optionally see past a delete marker now.
* 3. If deleted rows are kept, we have to find out when we can
* remove the delete markers.
* 4. Family delete markers are always first (regardless of their TS)
* 5. Delete markers should not be counted as version
* 6. Delete markers affect puts of the *same* TS
* 7. Delete marker need to be version counted together with puts
* they affect
*/
long timestamp = cell.getTimestamp();
byte typeByte = cell.getTypeByte();
long mvccVersion = cell.getSequenceId();
if (CellUtil.isDelete(typeByte)) {
if (keepDeletedCells == KeepDeletedCells.FALSE
|| (keepDeletedCells == KeepDeletedCells.TTL && timestamp < oldestUnexpiredTS)) {
// first ignore delete markers if the scanner can do so, and the
// range does not include the marker
//
// during flushes and compactions also ignore delete markers newer
// than the readpoint of any open scanner, this prevents deleted
// rows that could still be seen by a scanner from being collected
boolean includeDeleteMarker = tr.withinOrAfterTimeRange(timestamp);
if (includeDeleteMarker && mvccVersion <= maxReadPointToTrackVersions) {
this.deletes.add(cell);
}
// Can't early out now, because DelFam come before any other keys
}
if (timeToPurgeDeletes > 0
&& (EnvironmentEdgeManager.currentTime() - timestamp) <= timeToPurgeDeletes) {
return MatchCode.INCLUDE;
} else if (retainDeletesInOutput || mvccVersion > maxReadPointToTrackVersions) {
// always include or it is not time yet to check whether it is OK
// to purge deltes or not
// if this is not a user scan (compaction), we can filter this deletemarker right here
// otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking
return MatchCode.INCLUDE;
} else if (keepDeletedCells == KeepDeletedCells.TRUE
|| (keepDeletedCells == KeepDeletedCells.TTL && timestamp >= oldestUnexpiredTS)) {
if (timestamp < earliestPutTs) {
// keeping delete rows, but there are no puts older than
// this delete in the store files.
return columns.getNextRowOrNextColumn(cell);
}
// else: fall through and do version counting on the
// delete markers
} else {
return MatchCode.SKIP;
}
// note the following next else if...
// delete marker are not subject to other delete markers
} else if (!this.deletes.isEmpty()) {
DeleteResult deleteResult = deletes.isDeleted(cell);
switch (deleteResult) {
case FAMILY_DELETED:
case COLUMN_DELETED:
return columns.getNextRowOrNextColumn(cell);
case VERSION_DELETED:
case FAMILY_VERSION_DELETED:
return MatchCode.SKIP;
case NOT_DELETED:
break;
default:
throw new RuntimeException("UNEXPECTED");
}
}
int timestampComparison = tr.compare(timestamp);
if (timestampComparison >= 1) {
return MatchCode.SKIP;
} else if (timestampComparison <= -1) {
return columns.getNextRowOrNextColumn(cell);
}
// STEP 1: Check if the column is part of the requested columns
MatchCode colChecker = columns.checkColumn(cell, typeByte);
if (colChecker == MatchCode.INCLUDE) {
ReturnCode filterResponse = ReturnCode.SKIP;
// STEP 2: Yes, the column is part of the requested columns. Check if filter is present
if (filter != null) {
// STEP 3: Filter the key value and return if it filters out
filterResponse = filter.filterKeyValue(cell);
switch (filterResponse) {
case SKIP:
return MatchCode.SKIP;
case NEXT_COL:
return columns.getNextRowOrNextColumn(cell);
case NEXT_ROW:
return MatchCode.SEEK_NEXT_ROW;
case SEEK_NEXT_USING_HINT:
return MatchCode.SEEK_NEXT_USING_HINT;
default:
//It means it is either include or include and seek next
break;
}
}
/*
* STEP 4: Reaching this step means the column is part of the requested columns and either
* the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.
* Now check the number of versions needed. This method call returns SKIP, INCLUDE,
* INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL.
*
* FilterResponse ColumnChecker Desired behavior
* INCLUDE SKIP row has already been included, SKIP.
* INCLUDE INCLUDE INCLUDE
* INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP.
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
*
* In all the above scenarios, we return the column checker return value except for
* FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
*/
colChecker = columns.checkVersions(cell, timestamp, typeByte,
mvccVersion > maxReadPointToTrackVersions);
if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (colChecker != MatchCode.SKIP) {
return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
}
return MatchCode.SEEK_NEXT_ROW;
}
return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL
: colChecker;
}
return colChecker;
}
@Override
public boolean hasNullColumnInQuery() {
return hasNullColumn;
}
/**
* Handle partial-drop-deletes. As we match keys in order, when we have a range from which we can
* drop deletes, we can set retainDeletesInOutput to false for the duration of this range only,
* and maintain consistency.
*/
private void checkPartialDropDeleteRange(Cell curCell) {
// If partial-drop-deletes are used, initially, dropDeletesFromRow and dropDeletesToRow
// are both set, and the matcher is set to retain deletes. We assume ordered keys. When
// dropDeletesFromRow is leq current kv, we start dropping deletes and reset
// dropDeletesFromRow; thus the 2nd "if" starts to apply.
if ((dropDeletesFromRow != null)
&& (Arrays.equals(dropDeletesFromRow, HConstants.EMPTY_START_ROW)
|| (CellComparator.COMPARATOR.compareRows(curCell, dropDeletesFromRow, 0,
dropDeletesFromRow.length) >= 0))) {
retainDeletesInOutput = false;
dropDeletesFromRow = null;
}
// If dropDeletesFromRow is null and dropDeletesToRow is set, we are inside the partial-
// drop-deletes range. When dropDeletesToRow is leq current kv, we stop dropping deletes,
// and reset dropDeletesToRow so that we don't do any more compares.
if ((dropDeletesFromRow == null) && (dropDeletesToRow != null)
&& !Arrays.equals(dropDeletesToRow, HConstants.EMPTY_END_ROW) && (CellComparator.COMPARATOR
.compareRows(curCell, dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) {
retainDeletesInOutput = true;
dropDeletesToRow = null;
}
}
@Override
protected void reset() {
checkPartialDropDeleteRange(currentRow);
}
@Override
public boolean isUserScan() {
return false;
}
@Override
public boolean moreRowsMayExistAfter(Cell cell) {
if (this.stopRow == null || this.stopRow.length == 0) {
return true;
}
return rowComparator.compareRows(cell, stopRow, 0, stopRow.length) < 0;
}
@Override
public Filter getFilter() {
return filter;
}
@Override
public Cell getNextKeyHint(Cell cell) throws IOException {
if (filter == null) {
return null;
} else {
return filter.getNextCellHint(cell);
}
}
public static LegacyScanQueryMatcher create(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> columns, ScanType scanType, long readPointToUse, long earliestPutTs,
long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow, byte[] dropDeletesToRow,
RegionCoprocessorHost regionCoprocessorHost) throws IOException {
boolean hasNullColumn =
!(columns != null && columns.size() != 0 && columns.first().length != 0);
Pair<DeleteTracker, ColumnTracker> trackers = getTrackers(regionCoprocessorHost, null,
scanInfo, oldestUnexpiredTS, scan);
DeleteTracker deleteTracker = trackers.getFirst();
ColumnTracker columnTracker = trackers.getSecond();
if (dropDeletesFromRow == null) {
return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deleteTracker,
scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
} else {
return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deleteTracker,
scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow,
dropDeletesToRow);
}
}
}

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@ -269,19 +270,17 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
private InternalScanner createCompactorScanner(Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs)
throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions());
return new CompactorStoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
store.getSmallestReadPoint(), earliestPutTs);
return new CompactorStoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners,
scanType, store.getSmallestReadPoint(), earliestPutTs);
}
}
private static class CompactorStoreScanner extends StoreScanner {
public CompactorStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
public CompactorStoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs) throws IOException {
super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs);
super(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs);
}
@Override

View File

@ -576,9 +576,9 @@ public class TestFromClientSide {
}
@Override
protected List<KeyValueScanner> selectScannersFrom(
protected List<KeyValueScanner> selectScannersFrom(Store store,
List<? extends KeyValueScanner> allScanners) {
List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners);
List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners);
List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
for (KeyValueScanner scanner : scanners) {
newScanners.add(new DelegatingKeyValueScanner(scanner) {

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@ -116,18 +117,31 @@ public class TestRegionObserverScannerOpenHook {
}
}
private static final InternalScanner NO_DATA = new InternalScanner() {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return false;
}
@Override
public boolean next(List<Cell> results) throws IOException {
return false;
}
@Override
public void close() throws IOException {}
};
/**
* Don't allow any data in a flush by creating a custom {@link StoreScanner}.
*/
public static class NoDataFromFlush implements RegionObserver {
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
Scan scan = new Scan();
scan.setFilter(new NoDataFilter());
return new StoreScanner(store, store.getScanInfo(), scan,
scanners, ScanType.COMPACT_RETAIN_DELETES,
store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
scanners.forEach(KeyValueScanner::close);
return NO_DATA;
}
}
@ -140,11 +154,8 @@ public class TestRegionObserverScannerOpenHook {
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s) throws IOException {
Scan scan = new Scan();
scan.setFilter(new NoDataFilter());
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
scanners.forEach(KeyValueScanner::close);
return NO_DATA;
}
}

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
@ -883,13 +882,10 @@ public class TestPartitionedMobCompactor {
}
List<KeyValueScanner> scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs,
false, true, false, false, HConstants.LATEST_TIMESTAMP));
Scan scan = new Scan();
scan.setMaxVersions(hcd.getMaxVersions());
long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
long ttl = HStore.determineTTLFromFamily(hcd);
ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparator.COMPARATOR);
StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
scanners, 0L, HConstants.LATEST_TIMESTAMP);
StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_RETAIN_DELETES, scanners);
List<Cell> results = new ArrayList<>();
boolean hasMore = true;

View File

@ -19,9 +19,9 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.OptionalInt;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
* of functionality still behaves as expected.
*/
public class NoOpScanPolicyObserver implements RegionObserver {
/**
* Reimplement the default behavior
*/
@ -45,11 +46,9 @@ public class NoOpScanPolicyObserver implements RegionObserver {
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
ScanInfo oldSI = store.getScanInfo();
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, scanners,
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(),
oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
@ -57,16 +56,15 @@ public class NoOpScanPolicyObserver implements RegionObserver {
* Reimplement the default behavior
*/
@Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s) throws IOException {
// this demonstrates how to override the scanners default behavior
ScanInfo oldSI = store.getScanInfo();
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, scanners, scanType,
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(),
oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners, scanType,
store.getSmallestReadPoint(), earliestPutTs);
}

View File

@ -193,25 +193,25 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
ScanType scanType = ScanType.USER_SCAN;
InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null,
memstore.getScanners(0));
List<Cell> results = new ArrayList<>();
for (int i = 0; scanner.next(results); i++) {
int rowId = startRowId + i;
Cell left = results.get(0);
byte[] row1 = Bytes.toBytes(rowId);
assertTrue("Row name",
try (InternalScanner scanner =
new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
memstore.getScanners(0))) {
List<Cell> results = new ArrayList<>();
for (int i = 0; scanner.next(results); i++) {
int rowId = startRowId + i;
Cell left = results.get(0);
byte[] row1 = Bytes.toBytes(rowId);
assertTrue("Row name",
CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) == 0);
assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
List<Cell> row = new ArrayList<>();
for (Cell kv : results) {
row.add(kv);
assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
List<Cell> row = new ArrayList<>();
for (Cell kv : results) {
row.add(kv);
}
isExpectedRowWithoutTimestamps(rowId, row);
// Clear out set. Otherwise row results accumulate.
results.clear();
}
isExpectedRowWithoutTimestamps(rowId, row);
// Clear out set. Otherwise row results accumulate.
results.clear();
}
}
}

View File

@ -18,9 +18,17 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -55,22 +63,13 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/** memstore test case */
@Category({RegionServerTests.class, MediumTests.class})
@ -164,10 +163,8 @@ public class TestDefaultMemStore {
Configuration conf = HBaseConfiguration.create();
ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP,
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
ScanType scanType = ScanType.USER_SCAN;
StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
int count = 0;
try {
try (StoreScanner s = new StoreScanner(scan, scanInfo, null, memstorescanners)) {
while (s.next(result)) {
LOG.info(result);
count++;
@ -175,8 +172,6 @@ public class TestDefaultMemStore {
assertEquals(rowCount, result.size());
result.clear();
}
} finally {
s.close();
}
assertEquals(rowCount, count);
for (KeyValueScanner scanner : memstorescanners) {
@ -185,9 +180,8 @@ public class TestDefaultMemStore {
memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0;
try {
try (StoreScanner s = new StoreScanner(scan, scanInfo, null, memstorescanners)) {
while (s.next(result)) {
LOG.info(result);
// Assert the stuff is coming out in right order.
@ -201,8 +195,6 @@ public class TestDefaultMemStore {
}
result.clear();
}
} finally {
s.close();
}
assertEquals(rowCount, count);
for (KeyValueScanner scanner : memstorescanners) {
@ -211,10 +203,9 @@ public class TestDefaultMemStore {
memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
// Assert that new values are seen in kvset as we scan.
long ts = System.currentTimeMillis();
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0;
int snapshotIndex = 5;
try {
try (StoreScanner s = new StoreScanner(scan, scanInfo, null, memstorescanners)) {
while (s.next(result)) {
LOG.info(result);
// Assert the stuff is coming out in right order.
@ -225,14 +216,12 @@ public class TestDefaultMemStore {
if (count == snapshotIndex) {
MemStoreSnapshot snapshot = this.memstore.snapshot();
this.memstore.clearSnapshot(snapshot.getId());
// Added more rows into kvset. But the scanner wont see these rows.
// Added more rows into kvset. But the scanner wont see these rows.
addRows(this.memstore, ts);
LOG.info("Snapshotted, cleared it and then added values (which wont be seen)");
}
result.clear();
}
} finally {
s.close();
}
assertEquals(rowCount, count);
}
@ -600,27 +589,26 @@ public class TestDefaultMemStore {
//starting from each row, validate results should contain the starting row
Configuration conf = HBaseConfiguration.create();
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
ScanType scanType = ScanType.USER_SCAN;
try (InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null,
memstore.getScanners(0))) {
ScanInfo scanInfo =
new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
try (InternalScanner scanner =
new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
memstore.getScanners(0))) {
List<Cell> results = new ArrayList<>();
for (int i = 0; scanner.next(results); i++) {
int rowId = startRowId + i;
Cell left = results.get(0);
byte[] row1 = Bytes.toBytes(rowId);
assertTrue(
"Row name",
CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) == 0);
assertTrue("Row name",
CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) == 0);
assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
List<Cell> row = new ArrayList<>();
for (Cell kv : results) {
row.add(kv);
}
isExpectedRowWithoutTimestamps(rowId, row);
// Clear out set. Otherwise row results accumulate.
// Clear out set. Otherwise row results accumulate.
results.clear();
}
}

View File

@ -442,14 +442,11 @@ public class TestMobStoreCompaction {
List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, false,
HConstants.LATEST_TIMESTAMP);
Scan scan = new Scan();
scan.setMaxVersions(hcd.getMaxVersions());
long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
long ttl = HStore.determineTTLFromFamily(hcd);
ScanInfo scanInfo = new ScanInfo(copyOfConf, hcd, ttl, timeToPurgeDeletes,
CellComparator.COMPARATOR);
StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_DROP_DELETES, null,
scanners, 0L, HConstants.LATEST_TIMESTAMP);
StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_DROP_DELETES, scanners);
try {
size += UTIL.countRows(scanner);
} finally {

View File

@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -68,6 +66,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/**
* Test cases against ReversibleKeyValueScanner
*/
@ -263,7 +263,6 @@ public class TestReversibleScanners {
StoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
BloomType.NONE, true);
ScanType scanType = ScanType.USER_SCAN;
ScanInfo scanInfo =
new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR, false);
@ -271,16 +270,15 @@ public class TestReversibleScanners {
// Case 1.Test a full reversed scan
Scan scan = new Scan();
scan.setReversed(true);
StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2,
scan, scanType, scanInfo, MAXMVCC);
StoreScanner storeScanner =
getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
// Case 2.Test reversed scan with a specified start row
int startRowNum = ROWSIZE / 2;
byte[] startRow = ROWS[startRowNum];
scan.withStartRow(startRow);
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
scanType, scanInfo, MAXMVCC);
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
startRowNum + 1, false);
@ -289,16 +287,14 @@ public class TestReversibleScanners {
assertTrue(QUALSIZE > 2);
scan.addColumn(FAMILYNAME, QUALS[0]);
scan.addColumn(FAMILYNAME, QUALS[2]);
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
scanType, scanInfo, MAXMVCC);
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1,
false);
// Case 4.Test reversed scan with mvcc based on case 3
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
scanType, scanInfo, readPoint);
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint);
int expectedRowCount = 0;
int expectedKVCount = 0;
for (int i = startRowNum; i >= 0; i--) {
@ -423,7 +419,7 @@ public class TestReversibleScanners {
}
private StoreScanner getReversibleStoreScanner(MemStore memstore,
StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType,
StoreFile sf1, StoreFile sf2, Scan scan,
ScanInfo scanInfo, int readPoint) throws IOException {
List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null,
false, readPoint);
@ -434,7 +430,7 @@ public class TestReversibleScanners {
columns = entry.getValue();
}
StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo,
scanType, columns, scanners);
columns, scanners);
return storeScanner;
}

View File

@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.OptionalInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -251,18 +252,17 @@ public class TestCoprocessorScanPolicy {
family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(), family.isNewVersionBehavior());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
return new StoreScanner(store, scanInfo,
newVersions == null ? OptionalInt.empty() : OptionalInt.of(newVersions.intValue()),
scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
}
@Override
public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s) throws IOException {
final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s) throws IOException {
Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
@ -270,11 +270,11 @@ public class TestCoprocessorScanPolicy {
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(), family.isNewVersionBehavior());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, scanners, scanType,
store.getSmallestReadPoint(), earliestPutTs);
family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(),
family.isNewVersionBehavior());
return new StoreScanner(store, scanInfo,
newVersions == null ? OptionalInt.empty() : OptionalInt.of(newVersions.intValue()),
scanners, scanType, store.getSmallestReadPoint(), earliestPutTs);
}
@Override