HBASE-14266 RegionServers have a lock contention of Configuration.getProps

This commit is contained in:
stack 2015-10-24 14:43:53 -07:00
parent df36aef23c
commit 996866d3ae
18 changed files with 207 additions and 173 deletions

View File

@ -36,11 +36,10 @@ import org.apache.hadoop.hbase.util.VersionInfo;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HBaseConfiguration extends Configuration {
private static final Log LOG = LogFactory.getLog(HBaseConfiguration.class);
/**
* Instantinating HBaseConfiguration() is deprecated. Please use
* Instantiating HBaseConfiguration() is deprecated. Please use
* HBaseConfiguration#create() to construct a plain Configuration
* @deprecated Please use create() instead.
*/

View File

@ -188,7 +188,7 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
}
long ttl = Math.max(EnvironmentEdgeManager.currentTime() -
Bytes.toLong(data), oldSI.getTtl());
return new ScanInfo(store.getFamily(), ttl,
return new ScanInfo(oldSI.getConfiguration(), store.getFamily(), ttl,
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
}

View File

@ -551,7 +551,7 @@ public class PartitionedMobCompactor extends MobCompactor {
Scan scan = new Scan();
scan.setMaxVersions(column.getMaxVersions());
long ttl = HStore.determineTTLFromFamily(column);
ScanInfo scanInfo = new ScanInfo(column, ttl, 0, CellComparator.COMPARATOR);
ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, CellComparator.COMPARATOR);
StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
HConstants.LATEST_TIMESTAMP);
return scanner;

View File

@ -232,7 +232,7 @@ public class HStore implements Store {
long ttl = determineTTLFromFamily(family);
// Why not just pass a HColumnDescriptor in here altogether? Even if have
// to clone it?
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator });

View File

@ -19,15 +19,22 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import com.google.common.annotations.VisibleForTesting;
/**
* Immutable information for scans over a store.
*/
// Has to be public for PartitionedMobCompactor to access; ditto on tests making use of a few of
// the accessors below. Shutdown access. TODO
@VisibleForTesting
@InterfaceAudience.Private
public class ScanInfo {
private byte[] family;
@ -37,25 +44,32 @@ public class ScanInfo {
private KeepDeletedCells keepDeletedCells;
private long timeToPurgeDeletes;
private CellComparator comparator;
private long tableMaxRowSize;
private boolean usePread;
private long cellsPerTimeoutCheck;
private boolean parallelSeekEnabled;
private final Configuration conf;
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
+ (2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
+ (4 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_BOOLEAN));
/**
* @param conf
* @param family {@link HColumnDescriptor} describing the column family
* @param ttl Store's TTL (in ms)
* @param timeToPurgeDeletes duration in ms after which a delete marker can
* be purged during a major compaction.
* @param comparator The store's comparator
*/
public ScanInfo(final HColumnDescriptor family, final long ttl, final long timeToPurgeDeletes,
final CellComparator comparator) {
this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
public ScanInfo(final Configuration conf, final HColumnDescriptor family, final long ttl,
final long timeToPurgeDeletes, final CellComparator comparator) {
this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
.getKeepDeletedCells(), timeToPurgeDeletes, comparator);
}
/**
* @param conf
* @param family Name of this store's column family
* @param minVersions Store's MIN_VERSIONS setting
* @param maxVersions Store's VERSIONS setting
@ -65,9 +79,9 @@ public class ScanInfo {
* @param keepDeletedCells Store's keepDeletedCells setting
* @param comparator The store's comparator
*/
public ScanInfo(final byte[] family, final int minVersions, final int maxVersions,
final long ttl, final KeepDeletedCells keepDeletedCells, final long timeToPurgeDeletes,
final CellComparator comparator) {
public ScanInfo(final Configuration conf, final byte[] family, final int minVersions,
final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells,
final long timeToPurgeDeletes, final CellComparator comparator) {
this.family = family;
this.minVersions = minVersions;
this.maxVersions = maxVersions;
@ -75,13 +89,44 @@ public class ScanInfo {
this.keepDeletedCells = keepDeletedCells;
this.timeToPurgeDeletes = timeToPurgeDeletes;
this.comparator = comparator;
this.tableMaxRowSize =
conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
this.usePread = conf.getBoolean("hbase.storescanner.use.pread", false);
long perHeartbeat =
conf.getLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
this.cellsPerTimeoutCheck = perHeartbeat > 0?
perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
this.parallelSeekEnabled =
conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
this.conf = conf;
}
public byte[] getFamily() {
public Configuration getConfiguration() {
return this.conf;
}
long getTableMaxRowSize() {
return this.tableMaxRowSize;
}
boolean isUsePread() {
return this.usePread;
}
long getCellsPerTimeoutCheck() {
return this.cellsPerTimeoutCheck;
}
boolean isParallelSeekEnabled() {
return this.parallelSeekEnabled;
}
byte[] getFamily() {
return family;
}
public int getMinVersions() {
int getMinVersions() {
return minVersions;
}
@ -93,7 +138,7 @@ public class ScanInfo {
return ttl;
}
public KeepDeletedCells getKeepDeletedCells() {
KeepDeletedCells getKeepDeletedCells() {
return keepDeletedCells;
}

View File

@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
@ -49,6 +48,8 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.annotations.VisibleForTesting;
/**
* Scanner scans both the memstore and the Store. Coalesce KeyValue stream
* into List<KeyValue> for a single row.
@ -57,7 +58,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public class StoreScanner extends NonReversedNonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
private static final Log LOG = LogFactory.getLog(StoreScanner.class);
protected Store store;
// In unit tests, the store could be null
protected final Store store;
protected ScanQueryMatcher matcher;
protected KeyValueHeap heap;
protected boolean cacheBlocks;
@ -69,13 +71,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Used to indicate that the scanner has closed (see HBASE-1107)
// Doesnt need to be volatile because it's always accessed via synchronized methods
protected boolean closing = false;
protected final boolean isGet;
protected final boolean get;
protected final boolean explicitColumnQuery;
protected final boolean useRowColBloom;
/**
* A flag that enables StoreFileScanner parallel-seeking
*/
protected boolean isParallelSeekEnabled = false;
protected boolean parallelSeekEnabled = false;
protected ExecutorService executor;
protected final Scan scan;
protected final NavigableSet<byte[]> columns;
@ -135,56 +137,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
/** An internal constructor. */
protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
this.readPt = readPt;
this.store = store;
this.cacheBlocks = cacheBlocks;
isGet = scan.isGetScan();
get = scan.isGetScan();
int numCol = columns == null ? 0 : columns.size();
explicitColumnQuery = numCol > 0;
this.scan = scan;
this.columns = columns;
this.now = EnvironmentEdgeManager.currentTime();
this.oldestUnexpiredTS = now - ttl;
this.minVersions = minVersions;
this.oldestUnexpiredTS = now - scanInfo.getTtl();
this.minVersions = scanInfo.getMinVersions();
if (store != null && ((HStore)store).getHRegion() != null
&& ((HStore)store).getHRegion().getBaseConf() != null) {
Configuration conf = ((HStore) store).getHRegion().getBaseConf();
this.maxRowSize =
conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
// We look up row-column Bloom filters for multi-column queries as part of
// the seek operation. However, we also look the row-column Bloom filter
// for multi-row (non-"get") scans because this is not done in
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
long tmpCellsPerTimeoutCheck =
conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
this.cellsPerHeartbeatCheck =
tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
: DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
} else {
this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
this.scanUsePread = scan.isSmall();
this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
}
// We look up row-column Bloom filters for multi-column queries as part of
// the seek operation. However, we also look the row-column Bloom filter
// for multi-row (non-"get") scans because this is not done in
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
// The parallel-seeking is on :
// 1) the config value is *true*
// 2) store has more than one store file
if (store != null && ((HStore)store).getHRegion() != null
&& store.getStorefilesCount() > 1) {
RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
if (rsService == null || !rsService.getConfiguration().getBoolean(
STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
isParallelSeekEnabled = true;
executor = rsService.getExecutorService();
}
this.maxRowSize = scanInfo.getTableMaxRowSize();
this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
// Parallel seeking is on if the config allows and more there is more than one store file.
if (this.store != null && this.store.getStorefilesCount() > 1) {
RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
if (rsService != null && scanInfo.isParallelSeekEnabled()) {
this.parallelSeekEnabled = true;
this.executor = rsService.getExecutorService();
}
}
}
/**
@ -198,12 +181,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
*/
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
long readPt)
throws IOException {
this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
throws IOException {
this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException(
"Cannot specify any column for a raw scan");
throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
}
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
@ -219,7 +200,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Always check bloom filter to optimize the top row seek for delete
// family marker.
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
&& lazySeekEnabledGlobally, isParallelSeekEnabled);
&& lazySeekEnabledGlobally, parallelSeekEnabled);
// set storeLimit
this.storeLimit = scan.getMaxResultsPerColumnFamily();
@ -268,8 +249,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
this(store, scan, scanInfo, null,
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false);
if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
@ -282,13 +263,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
scanners = selectScannersFrom(scanners);
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator());
}
/** Constructor for testing. */
@VisibleForTesting
StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners) throws IOException {
@ -298,7 +279,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
0);
}
// Constructor for testing.
@VisibleForTesting
StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs)
@ -311,9 +292,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
public StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
throws IOException {
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
throws IOException {
this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
@ -322,7 +302,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.store.addChangedReaderObserver(this);
}
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
resetKVHeap(scanners, scanInfo.getComparator());
}
@ -332,8 +312,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
*/
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
final boolean isCompaction = false;
boolean usePread = isGet || scanUsePread;
return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
boolean usePread = get || scanUsePread;
return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
}
@ -452,7 +432,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return;
}
if (withHeapClose) this.closing = true;
// under test, we dont have a this.store
// Under test, we dont have a this.store
if (this.store != null) this.store.deleteChangedReaderObserver(this);
if (withHeapClose) {
for (KeyValueHeap h : this.heapsForDelayedClose) {
@ -501,67 +481,65 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
lock.lock();
try {
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
}
if (checkReseek()) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
// if the heap was left null, then the scanners had previously run out anyways, close and
// return.
if (this.heap == null) {
// By this time partial close should happened because already heap is null
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
Cell cell = this.heap.peek();
if (cell == null) {
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// only call setRow if the row changes; avoids confusing the query matcher
// if scanning intra-row
// If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
// rows. Else it is possible we are still traversing the same row so we must perform the row
// comparison.
if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
!CellUtil.matchingRow(cell, matcher.curCell)) {
this.countPerRow = 0;
matcher.setToNewRow(cell);
}
// Clear progress away unless invoker has indicated it should be kept.
if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
// Only do a sanity-check if store and comparator are available.
CellComparator comparator =
store != null ? store.getComparator() : null;
int count = 0;
long totalBytesRead = 0;
LOOP: do {
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
}
if (checkReseek()) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
checkScanOrder(prevCell, cell, comparator);
prevCell = cell;
// if the heap was left null, then the scanners had previously run out anyways, close and
// return.
if (this.heap == null) {
// By this time partial close should happened because already heap is null
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
qcode = optimize(qcode, cell);
switch(qcode) {
Cell cell = this.heap.peek();
if (cell == null) {
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// only call setRow if the row changes; avoids confusing the query matcher
// if scanning intra-row
// If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
// rows. Else it is possible we are still traversing the same row so we must perform the row
// comparison.
if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
!CellUtil.matchingRow(cell, matcher.curCell)) {
this.countPerRow = 0;
matcher.setToNewRow(cell);
}
// Clear progress away unless invoker has indicated it should be kept.
if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
// Only do a sanity-check if store and comparator are available.
CellComparator comparator = store != null ? store.getComparator() : null;
int count = 0;
long totalBytesRead = 0;
LOOP: do {
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
}
if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
checkScanOrder(prevCell, cell, comparator);
prevCell = cell;
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
qcode = optimize(qcode, cell);
switch(qcode) {
case INCLUDE:
case INCLUDE_AND_SEEK_NEXT_ROW:
case INCLUDE_AND_SEEK_NEXT_COL:
@ -597,7 +575,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (totalBytesRead > maxRowSize) {
throw new RowTooBigException("Max row size allowed: " + maxRowSize
+ ", but the row is bigger than that.");
+ ", but the row is bigger than that.");
}
}
@ -656,16 +634,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
default:
throw new RuntimeException("UNEXPECTED");
}
} while((cell = this.heap.peek()) != null);
if (count > 0) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
} while((cell = this.heap.peek()) != null);
if (count > 0) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
// No more keys
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
// No more keys
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} finally {
lock.unlock();
}
@ -764,7 +742,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
List<KeyValueScanner> scanners = getScannersNoCompaction();
// Seek all scanners to the initial key
seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator());

View File

@ -391,7 +391,7 @@ public class TestPartitionedMobCompactor {
scan.setMaxVersions(hcd.getMaxVersions());
long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
long ttl = HStore.determineTTLFromFamily(hcd);
ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, CellComparator.COMPARATOR);
ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparator.COMPARATOR);
StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
scanners, 0L, HConstants.LATEST_TIMESTAMP);
List<Cell> results = new ArrayList<>();

View File

@ -45,7 +45,7 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver {
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
ScanInfo oldSI = store.getScanInfo();
ScanInfo scanInfo = new ScanInfo(store.getFamily(), oldSI.getTtl(),
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getFamily(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions());
@ -62,7 +62,7 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver {
InternalScanner s) throws IOException {
// this demonstrates how to override the scanners default behavior
ScanInfo oldSI = store.getScanInfo();
ScanInfo scanInfo = new ScanInfo(store.getFamily(), oldSI.getTtl(),
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getFamily(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions());

View File

@ -195,7 +195,7 @@ public class TestCompaction {
for (Store hstore: this.r.stores.values()) {
HStore store = (HStore)hstore;
ScanInfo old = store.getScanInfo();
ScanInfo si = new ScanInfo(old.getFamily(),
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator());
store.setScanInfo(si);

View File

@ -312,7 +312,7 @@ public class TestDefaultCompactSelection extends TestCase {
public void testCompactionEmptyHFile() throws IOException {
// Set TTL
ScanInfo oldScanInfo = store.getScanInfo();
ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getFamily(),
ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(),
oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600,
oldScanInfo.getKeepDeletedCells(), oldScanInfo.getTimeToPurgeDeletes(),
oldScanInfo.getComparator());

View File

@ -102,8 +102,9 @@ public class TestDefaultMemStore extends TestCase {
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(0);
Scan scan = new Scan();
List<Cell> result = new ArrayList<Cell>();
Configuration conf = HBaseConfiguration.create();
ScanInfo scanInfo =
new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0,
new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0,
this.memstore.comparator);
ScanType scanType = ScanType.USER_SCAN;
StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
@ -522,9 +523,10 @@ public class TestDefaultMemStore extends TestCase {
}
}
//starting from each row, validate results should contain the starting row
Configuration conf = HBaseConfiguration.create();
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
ScanInfo scanInfo = new ScanInfo(FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE,
0, this.memstore.comparator);
ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
KeepDeletedCells.FALSE, 0, this.memstore.comparator);
ScanType scanType = ScanType.USER_SCAN;
InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null,

View File

@ -286,7 +286,7 @@ public class TestMajorCompaction {
for (Store hstore : r.getStores()) {
HStore store = ((HStore) hstore);
ScanInfo old = store.getScanInfo();
ScanInfo si = new ScanInfo(old.getFamily(),
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator());
store.setScanInfo(si);

View File

@ -442,7 +442,8 @@ public class TestMobStoreCompaction {
scan.setMaxVersions(hcd.getMaxVersions());
long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
long ttl = HStore.determineTTLFromFamily(hcd);
ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, CellComparator.COMPARATOR);
ScanInfo scanInfo = new ScanInfo(copyOfConf, hcd, ttl, timeToPurgeDeletes,
CellComparator.COMPARATOR);
StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_DROP_DELETES, null,
scanners, 0L, HConstants.LATEST_TIMESTAMP);
List<Cell> results = new ArrayList<>();

View File

@ -27,7 +27,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
@ -99,7 +101,7 @@ public class TestQueryMatcher extends HBaseTestCase {
private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
// 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(this.conf, fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
now - ttl, now);
@ -164,7 +166,7 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.DONE);
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(this.conf, fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
now - ttl, now);
@ -220,8 +222,9 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm =
new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
new ScanQueryMatcher(scan,
new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@ -274,7 +277,7 @@ public class TestQueryMatcher extends HBaseTestCase {
};
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(this.conf, fam2,
0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
now - testTTL, now);
@ -331,7 +334,8 @@ public class TestQueryMatcher extends HBaseTestCase {
byte[] from, byte[] to, byte[][] rows, MatchCode... expected) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
// Set time to purge deletes to negative value to avoid it ever happening.
ScanInfo scanInfo = new ScanInfo(fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L, rowComparator);
ScanInfo scanInfo =
new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L, rowComparator);
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,

View File

@ -253,7 +253,7 @@ public class TestReversibleScanners {
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
ScanType scanType = ScanType.USER_SCAN;
ScanInfo scanInfo = new ScanInfo(FAMILYNAME, 0, Integer.MAX_VALUE,
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE,
Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR);
// Case 1.Test a full reversed scan

View File

@ -30,8 +30,10 @@ import java.util.TreeSet;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
@ -49,7 +51,8 @@ import org.junit.experimental.categories.Category;
public class TestStoreScanner extends TestCase {
private static final String CF_STR = "cf";
final byte [] CF = Bytes.toBytes(CF_STR);
private ScanInfo scanInfo = new ScanInfo(CF, 0, Integer.MAX_VALUE,
static Configuration CONF = HBaseConfiguration.create();
private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE,
Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR);
private ScanType scanType = ScanType.USER_SCAN;
@ -418,7 +421,7 @@ public class TestStoreScanner extends TestCase {
List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scan = new Scan();
scan.setMaxVersions(1);
ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
CellComparator.COMPARATOR);
ScanType scanType = ScanType.USER_SCAN;
StoreScanner scanner =
@ -489,7 +492,7 @@ public class TestStoreScanner extends TestCase {
Scan scan = new Scan();
scan.setMaxVersions(1);
// scanner with ttl equal to 500
ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
CellComparator.COMPARATOR);
ScanType scanType = ScanType.USER_SCAN;
StoreScanner scanner =
@ -549,7 +552,7 @@ public class TestStoreScanner extends TestCase {
List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scan = new Scan();
scan.setMaxVersions(2);
ScanInfo scanInfo = new ScanInfo(Bytes.toBytes("cf"),
ScanInfo scanInfo = new ScanInfo(CONF, Bytes.toBytes("cf"),
0 /* minVersions */,
2 /* maxVersions */, 500 /* ttl */,
KeepDeletedCells.FALSE /* keepDeletedCells */,

View File

@ -189,7 +189,7 @@ public class TestStripeCompactor {
// Create store mock that is satisfactory for compactor.
HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
ScanInfo si = new ScanInfo(col, Long.MAX_VALUE, 0, CellComparator.COMPARATOR);
ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparator.COMPARATOR);
Store store = mock(Store.class);
when(store.getFamily()).thenReturn(col);
when(store.getScanInfo()).thenReturn(si);

View File

@ -250,7 +250,8 @@ public class TestCoprocessorScanPolicy {
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
@ -270,7 +271,8 @@ public class TestCoprocessorScanPolicy {
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
@ -290,7 +292,8 @@ public class TestCoprocessorScanPolicy {
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
@ -301,5 +304,4 @@ public class TestCoprocessorScanPolicy {
}
}
}
}