HBASE-14266 RegionServers have a lock contention of Configuration.getProps
This commit is contained in:
parent
91bca7323a
commit
4f8387959a
|
@ -36,11 +36,10 @@ import org.apache.hadoop.hbase.util.VersionInfo;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public class HBaseConfiguration extends Configuration {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HBaseConfiguration.class);
|
||||
|
||||
/**
|
||||
* Instantinating HBaseConfiguration() is deprecated. Please use
|
||||
* Instantiating HBaseConfiguration() is deprecated. Please use
|
||||
* HBaseConfiguration#create() to construct a plain Configuration
|
||||
*/
|
||||
@Deprecated
|
||||
|
|
|
@ -188,7 +188,7 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
|
|||
}
|
||||
long ttl = Math.max(EnvironmentEdgeManager.currentTime() -
|
||||
Bytes.toLong(data), oldSI.getTtl());
|
||||
return new ScanInfo(store.getFamily(), ttl,
|
||||
return new ScanInfo(oldSI.getConfiguration(), store.getFamily(), ttl,
|
||||
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
|
||||
}
|
||||
|
||||
|
|
|
@ -233,7 +233,7 @@ public class HStore implements Store {
|
|||
long ttl = determineTTLFromFamily(family);
|
||||
// Why not just pass a HColumnDescriptor in here altogether? Even if have
|
||||
// to clone it?
|
||||
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
|
||||
scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
|
||||
String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
|
||||
this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
|
||||
Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
|
||||
|
|
|
@ -24,10 +24,16 @@ import org.apache.hadoop.hbase.KeepDeletedCells;
|
|||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Immutable information for scans over a store.
|
||||
*/
|
||||
// Has to be public for PartitionedMobCompactor to access; ditto on tests making use of a few of
|
||||
// the accessors below. Shutdown access. TODO
|
||||
@VisibleForTesting
|
||||
@InterfaceAudience.Private
|
||||
public class ScanInfo {
|
||||
private byte[] family;
|
||||
|
@ -37,25 +43,32 @@ public class ScanInfo {
|
|||
private KeepDeletedCells keepDeletedCells;
|
||||
private long timeToPurgeDeletes;
|
||||
private KVComparator comparator;
|
||||
private long tableMaxRowSize;
|
||||
private boolean usePread;
|
||||
private long cellsPerTimeoutCheck;
|
||||
private boolean parallelSeekEnabled;
|
||||
private final Configuration conf;
|
||||
|
||||
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
|
||||
+ (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
|
||||
+ (2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
|
||||
+ (4 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_BOOLEAN));
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @param family {@link HColumnDescriptor} describing the column family
|
||||
* @param ttl Store's TTL (in ms)
|
||||
* @param timeToPurgeDeletes duration in ms after which a delete marker can
|
||||
* be purged during a major compaction.
|
||||
* @param comparator The store's comparator
|
||||
*/
|
||||
public ScanInfo(final HColumnDescriptor family, final long ttl, final long timeToPurgeDeletes,
|
||||
final KVComparator comparator) {
|
||||
this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
|
||||
public ScanInfo(final Configuration conf, final HColumnDescriptor family, final long ttl,
|
||||
final long timeToPurgeDeletes, final KVComparator comparator) {
|
||||
this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
|
||||
.getKeepDeletedCells(), timeToPurgeDeletes, comparator);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @param family Name of this store's column family
|
||||
* @param minVersions Store's MIN_VERSIONS setting
|
||||
* @param maxVersions Store's VERSIONS setting
|
||||
|
@ -65,9 +78,9 @@ public class ScanInfo {
|
|||
* @param keepDeletedCells Store's keepDeletedCells setting
|
||||
* @param comparator The store's comparator
|
||||
*/
|
||||
public ScanInfo(final byte[] family, final int minVersions, final int maxVersions,
|
||||
final long ttl, final KeepDeletedCells keepDeletedCells, final long timeToPurgeDeletes,
|
||||
final KVComparator comparator) {
|
||||
public ScanInfo(final Configuration conf, final byte[] family, final int minVersions,
|
||||
final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells,
|
||||
final long timeToPurgeDeletes, final KVComparator comparator) {
|
||||
this.family = family;
|
||||
this.minVersions = minVersions;
|
||||
this.maxVersions = maxVersions;
|
||||
|
@ -75,13 +88,44 @@ public class ScanInfo {
|
|||
this.keepDeletedCells = keepDeletedCells;
|
||||
this.timeToPurgeDeletes = timeToPurgeDeletes;
|
||||
this.comparator = comparator;
|
||||
this.tableMaxRowSize =
|
||||
conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
|
||||
this.usePread = conf.getBoolean("hbase.storescanner.use.pread", false);
|
||||
long perHeartbeat =
|
||||
conf.getLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
|
||||
StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
|
||||
this.cellsPerTimeoutCheck = perHeartbeat > 0?
|
||||
perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
|
||||
this.parallelSeekEnabled =
|
||||
conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public byte[] getFamily() {
|
||||
public Configuration getConfiguration() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
long getTableMaxRowSize() {
|
||||
return this.tableMaxRowSize;
|
||||
}
|
||||
|
||||
boolean isUsePread() {
|
||||
return this.usePread;
|
||||
}
|
||||
|
||||
long getCellsPerTimeoutCheck() {
|
||||
return this.cellsPerTimeoutCheck;
|
||||
}
|
||||
|
||||
boolean isParallelSeekEnabled() {
|
||||
return this.parallelSeekEnabled;
|
||||
}
|
||||
|
||||
byte[] getFamily() {
|
||||
return family;
|
||||
}
|
||||
|
||||
public int getMinVersions() {
|
||||
int getMinVersions() {
|
||||
return minVersions;
|
||||
}
|
||||
|
||||
|
@ -93,7 +137,7 @@ public class ScanInfo {
|
|||
return ttl;
|
||||
}
|
||||
|
||||
public KeepDeletedCells getKeepDeletedCells() {
|
||||
KeepDeletedCells getKeepDeletedCells() {
|
||||
return keepDeletedCells;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
|
@ -49,6 +48,8 @@ import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Scanner scans both the memstore and the Store. Coalesce KeyValue stream
|
||||
* into List<KeyValue> for a single row.
|
||||
|
@ -57,7 +58,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
|
||||
private static final Log LOG = LogFactory.getLog(StoreScanner.class);
|
||||
protected Store store;
|
||||
// In unit tests, the store could be null
|
||||
protected final Store store;
|
||||
protected ScanQueryMatcher matcher;
|
||||
protected KeyValueHeap heap;
|
||||
protected boolean cacheBlocks;
|
||||
|
@ -69,13 +71,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
// Used to indicate that the scanner has closed (see HBASE-1107)
|
||||
// Doesnt need to be volatile because it's always accessed via synchronized methods
|
||||
protected boolean closing = false;
|
||||
protected final boolean isGet;
|
||||
protected final boolean get;
|
||||
protected final boolean explicitColumnQuery;
|
||||
protected final boolean useRowColBloom;
|
||||
/**
|
||||
* A flag that enables StoreFileScanner parallel-seeking
|
||||
*/
|
||||
protected boolean isParallelSeekEnabled = false;
|
||||
protected boolean parallelSeekEnabled = false;
|
||||
protected ExecutorService executor;
|
||||
protected final Scan scan;
|
||||
protected final NavigableSet<byte[]> columns;
|
||||
|
@ -129,58 +131,39 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
AFTER_SEEK,
|
||||
COMPACT_COMPLETE
|
||||
}
|
||||
|
||||
|
||||
/** An internal constructor. */
|
||||
protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
|
||||
final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
|
||||
protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
|
||||
final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
|
||||
this.readPt = readPt;
|
||||
this.store = store;
|
||||
this.cacheBlocks = cacheBlocks;
|
||||
isGet = scan.isGetScan();
|
||||
get = scan.isGetScan();
|
||||
int numCol = columns == null ? 0 : columns.size();
|
||||
explicitColumnQuery = numCol > 0;
|
||||
this.scan = scan;
|
||||
this.columns = columns;
|
||||
this.now = EnvironmentEdgeManager.currentTime();
|
||||
this.oldestUnexpiredTS = now - ttl;
|
||||
this.minVersions = minVersions;
|
||||
this.oldestUnexpiredTS = now - scanInfo.getTtl();
|
||||
this.minVersions = scanInfo.getMinVersions();
|
||||
|
||||
if (store != null && ((HStore)store).getHRegion() != null
|
||||
&& ((HStore)store).getHRegion().getBaseConf() != null) {
|
||||
Configuration conf = ((HStore) store).getHRegion().getBaseConf();
|
||||
this.maxRowSize =
|
||||
conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
|
||||
this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
|
||||
// We look up row-column Bloom filters for multi-column queries as part of
|
||||
// the seek operation. However, we also look the row-column Bloom filter
|
||||
// for multi-row (non-"get") scans because this is not done in
|
||||
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
|
||||
this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
|
||||
|
||||
long tmpCellsPerTimeoutCheck =
|
||||
conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
|
||||
DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
|
||||
this.cellsPerHeartbeatCheck =
|
||||
tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
|
||||
: DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
|
||||
} else {
|
||||
this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
|
||||
this.scanUsePread = scan.isSmall();
|
||||
this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
|
||||
}
|
||||
|
||||
// We look up row-column Bloom filters for multi-column queries as part of
|
||||
// the seek operation. However, we also look the row-column Bloom filter
|
||||
// for multi-row (non-"get") scans because this is not done in
|
||||
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
|
||||
useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
|
||||
|
||||
// The parallel-seeking is on :
|
||||
// 1) the config value is *true*
|
||||
// 2) store has more than one store file
|
||||
if (store != null && ((HStore)store).getHRegion() != null
|
||||
&& store.getStorefilesCount() > 1) {
|
||||
RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
|
||||
if (rsService == null || !rsService.getConfiguration().getBoolean(
|
||||
STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
|
||||
isParallelSeekEnabled = true;
|
||||
executor = rsService.getExecutorService();
|
||||
}
|
||||
this.maxRowSize = scanInfo.getTableMaxRowSize();
|
||||
this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
|
||||
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
|
||||
// Parallel seeking is on if the config allows and more there is more than one store file.
|
||||
if (this.store != null && this.store.getStorefilesCount() > 1) {
|
||||
RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
|
||||
if (rsService != null && scanInfo.isParallelSeekEnabled()) {
|
||||
this.parallelSeekEnabled = true;
|
||||
this.executor = rsService.getExecutorService();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -194,12 +177,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
*/
|
||||
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
|
||||
long readPt)
|
||||
throws IOException {
|
||||
this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
|
||||
scanInfo.getMinVersions(), readPt);
|
||||
throws IOException {
|
||||
this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
|
||||
if (columns != null && scan.isRaw()) {
|
||||
throw new DoNotRetryIOException(
|
||||
"Cannot specify any column for a raw scan");
|
||||
throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
|
||||
}
|
||||
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
|
||||
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
|
||||
|
@ -215,7 +196,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
// Always check bloom filter to optimize the top row seek for delete
|
||||
// family marker.
|
||||
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
|
||||
&& lazySeekEnabledGlobally, isParallelSeekEnabled);
|
||||
&& lazySeekEnabledGlobally, parallelSeekEnabled);
|
||||
|
||||
// set storeLimit
|
||||
this.storeLimit = scan.getMaxResultsPerColumnFamily();
|
||||
|
@ -264,8 +245,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
|
||||
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
|
||||
long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
|
||||
this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
|
||||
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
|
||||
this(store, scan, scanInfo, null,
|
||||
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false);
|
||||
if (dropDeletesFromRow == null) {
|
||||
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
|
||||
earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
|
||||
|
@ -278,13 +259,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
scanners = selectScannersFrom(scanners);
|
||||
|
||||
// Seek all scanners to the initial key
|
||||
seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
|
||||
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
|
||||
|
||||
// Combine all seeked scanners with a heap
|
||||
resetKVHeap(scanners, store.getComparator());
|
||||
}
|
||||
|
||||
/** Constructor for testing. */
|
||||
@VisibleForTesting
|
||||
StoreScanner(final Scan scan, ScanInfo scanInfo,
|
||||
ScanType scanType, final NavigableSet<byte[]> columns,
|
||||
final List<KeyValueScanner> scanners) throws IOException {
|
||||
|
@ -294,7 +275,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
0);
|
||||
}
|
||||
|
||||
// Constructor for testing.
|
||||
@VisibleForTesting
|
||||
StoreScanner(final Scan scan, ScanInfo scanInfo,
|
||||
ScanType scanType, final NavigableSet<byte[]> columns,
|
||||
final List<KeyValueScanner> scanners, long earliestPutTs)
|
||||
|
@ -307,9 +288,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
private StoreScanner(final Scan scan, ScanInfo scanInfo,
|
||||
ScanType scanType, final NavigableSet<byte[]> columns,
|
||||
final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
|
||||
throws IOException {
|
||||
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
|
||||
scanInfo.getMinVersions(), readPt);
|
||||
throws IOException {
|
||||
this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
|
||||
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
|
||||
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
|
||||
|
||||
|
@ -318,7 +298,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
this.store.addChangedReaderObserver(this);
|
||||
}
|
||||
// Seek all scanners to the initial key
|
||||
seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
|
||||
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
|
||||
resetKVHeap(scanners, scanInfo.getComparator());
|
||||
}
|
||||
|
||||
|
@ -328,8 +308,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
*/
|
||||
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
|
||||
final boolean isCompaction = false;
|
||||
boolean usePread = isGet || scanUsePread;
|
||||
return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
|
||||
boolean usePread = get || scanUsePread;
|
||||
return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
|
||||
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
|
||||
}
|
||||
|
||||
|
@ -442,7 +422,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
try {
|
||||
if (this.closing) return;
|
||||
this.closing = true;
|
||||
// under test, we dont have a this.store
|
||||
// Under test, we dont have a this.store
|
||||
if (this.store != null)
|
||||
this.store.deleteChangedReaderObserver(this);
|
||||
if (this.heap != null)
|
||||
|
@ -579,7 +559,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
|
||||
if (totalBytesRead > maxRowSize) {
|
||||
throw new RowTooBigException("Max row size allowed: " + maxRowSize
|
||||
+ ", but the row is bigger than that.");
|
||||
+ ", but the row is bigger than that.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -747,7 +727,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
List<KeyValueScanner> scanners = getScannersNoCompaction();
|
||||
|
||||
// Seek all scanners to the initial key
|
||||
seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
|
||||
seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
|
||||
|
||||
// Combine all seeked scanners with a heap
|
||||
resetKVHeap(scanners, store.getComparator());
|
||||
|
|
|
@ -45,7 +45,7 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver {
|
|||
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
|
||||
ScanInfo oldSI = store.getScanInfo();
|
||||
ScanInfo scanInfo = new ScanInfo(store.getFamily(), oldSI.getTtl(),
|
||||
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getFamily(), oldSI.getTtl(),
|
||||
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(oldSI.getMaxVersions());
|
||||
|
@ -62,7 +62,7 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver {
|
|||
InternalScanner s) throws IOException {
|
||||
// this demonstrates how to override the scanners default behavior
|
||||
ScanInfo oldSI = store.getScanInfo();
|
||||
ScanInfo scanInfo = new ScanInfo(store.getFamily(), oldSI.getTtl(),
|
||||
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getFamily(), oldSI.getTtl(),
|
||||
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(oldSI.getMaxVersions());
|
||||
|
|
|
@ -197,7 +197,7 @@ public class TestCompaction {
|
|||
for (Store hstore: this.r.stores.values()) {
|
||||
HStore store = (HStore)hstore;
|
||||
ScanInfo old = store.getScanInfo();
|
||||
ScanInfo si = new ScanInfo(old.getFamily(),
|
||||
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
|
||||
old.getMinVersions(), old.getMaxVersions(), ttl,
|
||||
old.getKeepDeletedCells(), 0, old.getComparator());
|
||||
store.setScanInfo(si);
|
||||
|
|
|
@ -309,7 +309,7 @@ public class TestDefaultCompactSelection extends TestCase {
|
|||
public void testCompactionEmptyHFile() throws IOException {
|
||||
// Set TTL
|
||||
ScanInfo oldScanInfo = store.getScanInfo();
|
||||
ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getFamily(),
|
||||
ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(),
|
||||
oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600,
|
||||
oldScanInfo.getKeepDeletedCells(), oldScanInfo.getTimeToPurgeDeletes(),
|
||||
oldScanInfo.getComparator());
|
||||
|
|
|
@ -97,8 +97,9 @@ public class TestDefaultMemStore extends TestCase {
|
|||
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(0);
|
||||
Scan scan = new Scan();
|
||||
List<Cell> result = new ArrayList<Cell>();
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
ScanInfo scanInfo =
|
||||
new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0,
|
||||
new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0,
|
||||
this.memstore.comparator);
|
||||
ScanType scanType = ScanType.USER_SCAN;
|
||||
StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
|
||||
|
@ -517,9 +518,10 @@ public class TestDefaultMemStore extends TestCase {
|
|||
}
|
||||
}
|
||||
//starting from each row, validate results should contain the starting row
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
|
||||
ScanInfo scanInfo = new ScanInfo(FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE,
|
||||
0, this.memstore.comparator);
|
||||
ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
|
||||
KeepDeletedCells.FALSE, 0, this.memstore.comparator);
|
||||
ScanType scanType = ScanType.USER_SCAN;
|
||||
InternalScanner scanner = new StoreScanner(new Scan(
|
||||
Bytes.toBytes(startRowId)), scanInfo, scanType, null,
|
||||
|
|
|
@ -285,7 +285,7 @@ public class TestMajorCompaction {
|
|||
for (Store hstore : r.getStores()) {
|
||||
HStore store = ((HStore) hstore);
|
||||
ScanInfo old = store.getScanInfo();
|
||||
ScanInfo si = new ScanInfo(old.getFamily(),
|
||||
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
|
||||
old.getMinVersions(), old.getMaxVersions(), ttl,
|
||||
old.getKeepDeletedCells(), 0, old.getComparator());
|
||||
store.setScanInfo(si);
|
||||
|
|
|
@ -27,6 +27,8 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
|
@ -94,7 +96,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
|||
private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
// 2,4,5
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(this.conf, fam2,
|
||||
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
|
||||
now - ttl, now);
|
||||
|
||||
|
@ -157,7 +159,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
|||
expected.add(ScanQueryMatcher.MatchCode.DONE);
|
||||
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(this.conf, fam2,
|
||||
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
|
||||
now - ttl, now);
|
||||
|
||||
|
@ -212,8 +214,9 @@ public class TestQueryMatcher extends HBaseTestCase {
|
|||
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
ScanQueryMatcher qm =
|
||||
new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
|
||||
rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
|
||||
new ScanQueryMatcher(scan,
|
||||
new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
|
||||
rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
|
||||
|
||||
KeyValue [] kvs = new KeyValue[] {
|
||||
new KeyValue(row1, fam2, col1, now-100, data),
|
||||
|
@ -265,7 +268,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
|||
};
|
||||
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(this.conf, fam2,
|
||||
0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
|
||||
now - testTTL, now);
|
||||
|
||||
|
@ -321,7 +324,8 @@ public class TestQueryMatcher extends HBaseTestCase {
|
|||
byte[] from, byte[] to, byte[][] rows, MatchCode... expected) throws IOException {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
// Set time to purge deletes to negative value to avoid it ever happening.
|
||||
ScanInfo scanInfo = new ScanInfo(fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L, rowComparator);
|
||||
ScanInfo scanInfo =
|
||||
new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L, rowComparator);
|
||||
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
|
||||
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
|
||||
|
|
|
@ -1,708 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.filter.FilterList.Operator;
|
||||
import org.apache.hadoop.hbase.filter.PageFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
/**
|
||||
* Test cases against ReversibleKeyValueScanner
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestReversibleScanners {
|
||||
private static final Log LOG = LogFactory.getLog(TestReversibleScanners.class);
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
|
||||
private static long TS = System.currentTimeMillis();
|
||||
private static int MAXMVCC = 7;
|
||||
private static byte[] ROW = Bytes.toBytes("testRow");
|
||||
private static final int ROWSIZE = 200;
|
||||
private static byte[][] ROWS = makeN(ROW, ROWSIZE);
|
||||
private static byte[] QUAL = Bytes.toBytes("testQual");
|
||||
private static final int QUALSIZE = 5;
|
||||
private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
|
||||
private static byte[] VALUE = Bytes.toBytes("testValue");
|
||||
private static final int VALUESIZE = 3;
|
||||
private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
|
||||
|
||||
@Test
|
||||
public void testReversibleStoreFileScanner() throws IOException {
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
Path hfilePath = new Path(new Path(
|
||||
TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"),
|
||||
"regionname"), "familyname");
|
||||
CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
|
||||
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
|
||||
HFileContextBuilder hcBuilder = new HFileContextBuilder();
|
||||
hcBuilder.withBlockSize(2 * 1024);
|
||||
hcBuilder.withDataBlockEncoding(encoding);
|
||||
HFileContext hFileContext = hcBuilder.build();
|
||||
StoreFile.Writer writer = new StoreFile.WriterBuilder(
|
||||
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(hfilePath)
|
||||
.withFileContext(hFileContext).build();
|
||||
writeStoreFile(writer);
|
||||
|
||||
StoreFile sf = new StoreFile(fs, writer.getPath(),
|
||||
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
|
||||
|
||||
List<StoreFileScanner> scanners = StoreFileScanner
|
||||
.getScannersForStoreFiles(Collections.singletonList(sf),
|
||||
false, true, false, false, Long.MAX_VALUE);
|
||||
StoreFileScanner scanner = scanners.get(0);
|
||||
seekTestOfReversibleKeyValueScanner(scanner);
|
||||
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
|
||||
LOG.info("Setting read point to " + readPoint);
|
||||
scanners = StoreFileScanner.getScannersForStoreFiles(
|
||||
Collections.singletonList(sf), false, true, false, false, readPoint);
|
||||
seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversibleMemstoreScanner() throws IOException {
|
||||
MemStore memstore = new DefaultMemStore();
|
||||
writeMemstore(memstore);
|
||||
List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
|
||||
seekTestOfReversibleKeyValueScanner(scanners.get(0));
|
||||
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
|
||||
LOG.info("Setting read point to " + readPoint);
|
||||
scanners = memstore.getScanners(readPoint);
|
||||
seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversibleKeyValueHeap() throws IOException {
|
||||
// write data to one memstore and two store files
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
Path hfilePath = new Path(new Path(
|
||||
TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"),
|
||||
"familyname");
|
||||
CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
|
||||
HFileContextBuilder hcBuilder = new HFileContextBuilder();
|
||||
hcBuilder.withBlockSize(2 * 1024);
|
||||
HFileContext hFileContext = hcBuilder.build();
|
||||
StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
|
||||
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
|
||||
hfilePath).withFileContext(hFileContext).build();
|
||||
StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
|
||||
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
|
||||
hfilePath).withFileContext(hFileContext).build();
|
||||
|
||||
MemStore memstore = new DefaultMemStore();
|
||||
writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
|
||||
writer2 });
|
||||
|
||||
StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
|
||||
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
|
||||
|
||||
StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
|
||||
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
|
||||
/**
|
||||
* Test without MVCC
|
||||
*/
|
||||
int startRowNum = ROWSIZE / 2;
|
||||
ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
|
||||
ROWS[startRowNum], MAXMVCC);
|
||||
internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
|
||||
|
||||
startRowNum = ROWSIZE - 1;
|
||||
kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
|
||||
HConstants.EMPTY_START_ROW, MAXMVCC);
|
||||
internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
|
||||
|
||||
/**
|
||||
* Test with MVCC
|
||||
*/
|
||||
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
|
||||
LOG.info("Setting read point to " + readPoint);
|
||||
startRowNum = ROWSIZE - 1;
|
||||
kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
|
||||
HConstants.EMPTY_START_ROW, readPoint);
|
||||
for (int i = startRowNum; i >= 0; i--) {
|
||||
if (i - 2 < 0) break;
|
||||
i = i - 2;
|
||||
kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
|
||||
Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
|
||||
i, 0, readPoint);
|
||||
if (nextReadableNum == null) break;
|
||||
KeyValue expecedKey = makeKV(nextReadableNum.getFirst(),
|
||||
nextReadableNum.getSecond());
|
||||
assertEquals(expecedKey, kvHeap.peek());
|
||||
i = nextReadableNum.getFirst();
|
||||
int qualNum = nextReadableNum.getSecond();
|
||||
if (qualNum + 1 < QUALSIZE) {
|
||||
kvHeap.backwardSeek(makeKV(i, qualNum + 1));
|
||||
nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
|
||||
readPoint);
|
||||
if (nextReadableNum == null) break;
|
||||
expecedKey = makeKV(nextReadableNum.getFirst(),
|
||||
nextReadableNum.getSecond());
|
||||
assertEquals(expecedKey, kvHeap.peek());
|
||||
i = nextReadableNum.getFirst();
|
||||
qualNum = nextReadableNum.getSecond();
|
||||
}
|
||||
|
||||
kvHeap.next();
|
||||
|
||||
if (qualNum + 1 >= QUALSIZE) {
|
||||
nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0,
|
||||
readPoint);
|
||||
} else {
|
||||
nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
|
||||
readPoint);
|
||||
}
|
||||
if (nextReadableNum == null) break;
|
||||
expecedKey = makeKV(nextReadableNum.getFirst(),
|
||||
nextReadableNum.getSecond());
|
||||
assertEquals(expecedKey, kvHeap.peek());
|
||||
i = nextReadableNum.getFirst();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversibleStoreScanner() throws IOException {
|
||||
// write data to one memstore and two store files
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
Path hfilePath = new Path(new Path(
|
||||
TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"),
|
||||
"familyname");
|
||||
CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
|
||||
HFileContextBuilder hcBuilder = new HFileContextBuilder();
|
||||
hcBuilder.withBlockSize(2 * 1024);
|
||||
HFileContext hFileContext = hcBuilder.build();
|
||||
StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
|
||||
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
|
||||
hfilePath).withFileContext(hFileContext).build();
|
||||
StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
|
||||
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
|
||||
hfilePath).withFileContext(hFileContext).build();
|
||||
|
||||
MemStore memstore = new DefaultMemStore();
|
||||
writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
|
||||
writer2 });
|
||||
|
||||
StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
|
||||
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
|
||||
|
||||
StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
|
||||
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
|
||||
|
||||
ScanType scanType = ScanType.USER_SCAN;
|
||||
ScanInfo scanInfo = new ScanInfo(FAMILYNAME, 0, Integer.MAX_VALUE,
|
||||
Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
|
||||
|
||||
// Case 1.Test a full reversed scan
|
||||
Scan scan = new Scan();
|
||||
scan.setReversed(true);
|
||||
StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2,
|
||||
scan, scanType, scanInfo, MAXMVCC);
|
||||
verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
|
||||
|
||||
// Case 2.Test reversed scan with a specified start row
|
||||
int startRowNum = ROWSIZE / 2;
|
||||
byte[] startRow = ROWS[startRowNum];
|
||||
scan.setStartRow(startRow);
|
||||
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
|
||||
scanType, scanInfo, MAXMVCC);
|
||||
verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
|
||||
startRowNum + 1, false);
|
||||
|
||||
// Case 3.Test reversed scan with a specified start row and specified
|
||||
// qualifiers
|
||||
assertTrue(QUALSIZE > 2);
|
||||
scan.addColumn(FAMILYNAME, QUALS[0]);
|
||||
scan.addColumn(FAMILYNAME, QUALS[2]);
|
||||
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
|
||||
scanType, scanInfo, MAXMVCC);
|
||||
verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1,
|
||||
false);
|
||||
|
||||
// Case 4.Test reversed scan with mvcc based on case 3
|
||||
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
|
||||
LOG.info("Setting read point to " + readPoint);
|
||||
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
|
||||
scanType, scanInfo, readPoint);
|
||||
int expectedRowCount = 0;
|
||||
int expectedKVCount = 0;
|
||||
for (int i = startRowNum; i >= 0; i--) {
|
||||
int kvCount = 0;
|
||||
if (makeMVCC(i, 0) <= readPoint) {
|
||||
kvCount++;
|
||||
}
|
||||
if (makeMVCC(i, 2) <= readPoint) {
|
||||
kvCount++;
|
||||
}
|
||||
if (kvCount > 0) {
|
||||
expectedRowCount++;
|
||||
expectedKVCount += kvCount;
|
||||
}
|
||||
}
|
||||
verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount,
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversibleRegionScanner() throws IOException {
|
||||
byte[] tableName = Bytes.toBytes("testtable");
|
||||
byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
HRegion region = TEST_UTIL.createLocalHRegion(tableName, null, null,
|
||||
"testReversibleRegionScanner", conf, false, Durability.SYNC_WAL, null,
|
||||
FAMILYNAME, FAMILYNAME2);
|
||||
loadDataToRegion(region, FAMILYNAME2);
|
||||
|
||||
// verify row count with forward scan
|
||||
Scan scan = new Scan();
|
||||
InternalScanner scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
|
||||
|
||||
// Case1:Full reversed scan
|
||||
scan.setReversed(true);
|
||||
scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
|
||||
|
||||
// Case2:Full reversed scan with one family
|
||||
scan = new Scan();
|
||||
scan.setReversed(true);
|
||||
scan.addFamily(FAMILYNAME);
|
||||
scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
|
||||
|
||||
// Case3:Specify qualifiers + One family
|
||||
byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
|
||||
for (byte[] specifiedQualifier : specifiedQualifiers)
|
||||
scan.addColumn(FAMILYNAME, specifiedQualifier);
|
||||
scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
|
||||
|
||||
// Case4:Specify qualifiers + Two families
|
||||
for (byte[] specifiedQualifier : specifiedQualifiers)
|
||||
scan.addColumn(FAMILYNAME2, specifiedQualifier);
|
||||
scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
|
||||
|
||||
// Case5: Case4 + specify start row
|
||||
int startRowNum = ROWSIZE * 3 / 4;
|
||||
scan.setStartRow(ROWS[startRowNum]);
|
||||
scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1),
|
||||
false);
|
||||
|
||||
// Case6: Case4 + specify stop row
|
||||
int stopRowNum = ROWSIZE / 4;
|
||||
scan.setStartRow(HConstants.EMPTY_BYTE_ARRAY);
|
||||
scan.setStopRow(ROWS[stopRowNum]);
|
||||
scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE
|
||||
- stopRowNum - 1), false);
|
||||
|
||||
// Case7: Case4 + specify start row + specify stop row
|
||||
scan.setStartRow(ROWS[startRowNum]);
|
||||
scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2,
|
||||
(startRowNum - stopRowNum), false);
|
||||
|
||||
// Case8: Case7 + SingleColumnValueFilter
|
||||
int valueNum = startRowNum % VALUESIZE;
|
||||
Filter filter = new SingleColumnValueFilter(FAMILYNAME,
|
||||
specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]);
|
||||
scan.setFilter(filter);
|
||||
scanner = region.getScanner(scan);
|
||||
int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE
|
||||
+ (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
|
||||
verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum,
|
||||
false);
|
||||
|
||||
// Case9: Case7 + PageFilter
|
||||
int pageSize = 10;
|
||||
filter = new PageFilter(pageSize);
|
||||
scan.setFilter(filter);
|
||||
scanner = region.getScanner(scan);
|
||||
int expectedRowNum = pageSize;
|
||||
verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
|
||||
|
||||
// Case10: Case7 + FilterList+MUST_PASS_ONE
|
||||
SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(
|
||||
FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]);
|
||||
SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(
|
||||
FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]);
|
||||
expectedRowNum = 0;
|
||||
for (int i = startRowNum; i > stopRowNum; i--) {
|
||||
if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
|
||||
expectedRowNum++;
|
||||
}
|
||||
}
|
||||
filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
|
||||
scan.setFilter(filter);
|
||||
scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
|
||||
|
||||
// Case10: Case7 + FilterList+MUST_PASS_ALL
|
||||
filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
|
||||
expectedRowNum = 0;
|
||||
scan.setFilter(filter);
|
||||
scanner = region.getScanner(scan);
|
||||
verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
|
||||
}
|
||||
|
||||
private StoreScanner getReversibleStoreScanner(MemStore memstore,
|
||||
StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType,
|
||||
ScanInfo scanInfo, int readPoint) throws IOException {
|
||||
List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null,
|
||||
false, readPoint);
|
||||
NavigableSet<byte[]> columns = null;
|
||||
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
|
||||
.entrySet()) {
|
||||
// Should only one family
|
||||
columns = entry.getValue();
|
||||
}
|
||||
StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo,
|
||||
scanType, columns, scanners);
|
||||
return storeScanner;
|
||||
}
|
||||
|
||||
private void verifyCountAndOrder(InternalScanner scanner,
|
||||
int expectedKVCount, int expectedRowCount, boolean forward)
|
||||
throws IOException {
|
||||
List<Cell> kvList = new ArrayList<Cell>();
|
||||
Result lastResult = null;
|
||||
int rowCount = 0;
|
||||
int kvCount = 0;
|
||||
try {
|
||||
while (scanner.next(kvList)) {
|
||||
if (kvList.isEmpty()) continue;
|
||||
rowCount++;
|
||||
kvCount += kvList.size();
|
||||
if (lastResult != null) {
|
||||
Result curResult = Result.create(kvList);
|
||||
assertEquals("LastResult:" + lastResult + "CurResult:" + curResult,
|
||||
forward,
|
||||
Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
|
||||
}
|
||||
lastResult = Result.create(kvList);
|
||||
kvList.clear();
|
||||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
if (!kvList.isEmpty()) {
|
||||
rowCount++;
|
||||
kvCount += kvList.size();
|
||||
kvList.clear();
|
||||
}
|
||||
assertEquals(expectedKVCount, kvCount);
|
||||
assertEquals(expectedRowCount, rowCount);
|
||||
}
|
||||
|
||||
private void internalTestSeekAndNextForReversibleKeyValueHeap(
|
||||
ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException {
|
||||
// Test next and seek
|
||||
for (int i = startRowNum; i >= 0; i--) {
|
||||
if (i % 2 == 1 && i - 2 >= 0) {
|
||||
i = i - 2;
|
||||
kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
|
||||
}
|
||||
for (int j = 0; j < QUALSIZE; j++) {
|
||||
if (j % 2 == 1 && (j + 1) < QUALSIZE) {
|
||||
j = j + 1;
|
||||
kvHeap.backwardSeek(makeKV(i, j));
|
||||
}
|
||||
assertEquals(makeKV(i, j), kvHeap.peek());
|
||||
kvHeap.next();
|
||||
}
|
||||
}
|
||||
assertEquals(null, kvHeap.peek());
|
||||
}
|
||||
|
||||
private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore,
|
||||
StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint)
|
||||
throws IOException {
|
||||
List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow,
|
||||
true, readPoint);
|
||||
ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners,
|
||||
KeyValue.COMPARATOR);
|
||||
return kvHeap;
|
||||
}
|
||||
|
||||
private List<KeyValueScanner> getScanners(MemStore memstore, StoreFile sf1,
|
||||
StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint)
|
||||
throws IOException {
|
||||
List<StoreFileScanner> fileScanners = StoreFileScanner
|
||||
.getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
|
||||
false, false, readPoint);
|
||||
List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
|
||||
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
|
||||
fileScanners.size() + 1);
|
||||
scanners.addAll(fileScanners);
|
||||
scanners.addAll(memScanners);
|
||||
|
||||
if (doSeek) {
|
||||
if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
scanner.seekToLastRow();
|
||||
}
|
||||
} else {
|
||||
KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow);
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
scanner.backwardSeek(startKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
return scanners;
|
||||
}
|
||||
|
||||
private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner)
|
||||
throws IOException {
|
||||
/**
|
||||
* Test without MVCC
|
||||
*/
|
||||
// Test seek to last row
|
||||
assertTrue(scanner.seekToLastRow());
|
||||
assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
|
||||
|
||||
// Test backward seek in three cases
|
||||
// Case1: seek in the same row in backwardSeek
|
||||
KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
|
||||
assertTrue(scanner.backwardSeek(seekKey));
|
||||
assertEquals(seekKey, scanner.peek());
|
||||
|
||||
// Case2: seek to the previous row in backwardSeek
|
||||
int seekRowNum = ROWSIZE - 2;
|
||||
assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum])));
|
||||
KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
|
||||
assertEquals(expectedKey, scanner.peek());
|
||||
|
||||
// Case3: unable to backward seek
|
||||
assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0])));
|
||||
assertEquals(null, scanner.peek());
|
||||
|
||||
// Test seek to previous row
|
||||
seekRowNum = ROWSIZE - 4;
|
||||
assertTrue(scanner.seekToPreviousRow(KeyValueUtil
|
||||
.createFirstOnRow(ROWS[seekRowNum])));
|
||||
expectedKey = makeKV(seekRowNum - 1, 0);
|
||||
assertEquals(expectedKey, scanner.peek());
|
||||
|
||||
// Test seek to previous row for the first row
|
||||
assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
|
||||
assertEquals(null, scanner.peek());
|
||||
|
||||
}
|
||||
|
||||
private void seekTestOfReversibleKeyValueScannerWithMVCC(
|
||||
KeyValueScanner scanner, int readPoint) throws IOException {
|
||||
/**
|
||||
* Test with MVCC
|
||||
*/
|
||||
// Test seek to last row
|
||||
KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
|
||||
ROWSIZE - 1, 0, readPoint);
|
||||
assertEquals(expectedKey != null, scanner.seekToLastRow());
|
||||
assertEquals(expectedKey, scanner.peek());
|
||||
|
||||
// Test backward seek in two cases
|
||||
// Case1: seek in the same row in backwardSeek
|
||||
expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2,
|
||||
QUALSIZE - 2, readPoint);
|
||||
assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey));
|
||||
assertEquals(expectedKey, scanner.peek());
|
||||
|
||||
// Case2: seek to the previous row in backwardSeek
|
||||
int seekRowNum = ROWSIZE - 3;
|
||||
KeyValue seekKey = KeyValueUtil.createLastOnRow(ROWS[seekRowNum]);
|
||||
expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
|
||||
readPoint);
|
||||
assertEquals(expectedKey != null, scanner.backwardSeek(seekKey));
|
||||
assertEquals(expectedKey, scanner.peek());
|
||||
|
||||
// Test seek to previous row
|
||||
seekRowNum = ROWSIZE - 4;
|
||||
expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
|
||||
readPoint);
|
||||
assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValueUtil
|
||||
.createFirstOnRow(ROWS[seekRowNum])));
|
||||
assertEquals(expectedKey, scanner.peek());
|
||||
}
|
||||
|
||||
private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum,
|
||||
int startQualNum, int readPoint) {
|
||||
Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
|
||||
startRowNum, startQualNum, readPoint);
|
||||
if (nextReadableNum == null)
|
||||
return null;
|
||||
return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
|
||||
}
|
||||
|
||||
private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(
|
||||
int startRowNum, int startQualNum, int readPoint) {
|
||||
Pair<Integer, Integer> nextReadableNum = null;
|
||||
boolean findExpected = false;
|
||||
for (int i = startRowNum; i >= 0; i--) {
|
||||
for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
|
||||
if (makeMVCC(i, j) <= readPoint) {
|
||||
nextReadableNum = new Pair<Integer, Integer>(i, j);
|
||||
findExpected = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (findExpected)
|
||||
break;
|
||||
}
|
||||
return nextReadableNum;
|
||||
}
|
||||
|
||||
private static void loadDataToRegion(Region region, byte[] additionalFamily)
|
||||
throws IOException {
|
||||
for (int i = 0; i < ROWSIZE; i++) {
|
||||
Put put = new Put(ROWS[i]);
|
||||
for (int j = 0; j < QUALSIZE; j++) {
|
||||
put.add(makeKV(i, j));
|
||||
// put additional family
|
||||
put.add(makeKV(i, j, additionalFamily));
|
||||
}
|
||||
region.put(put);
|
||||
if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
|
||||
region.flush(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void writeMemstoreAndStoreFiles(MemStore memstore,
|
||||
final StoreFile.Writer[] writers) throws IOException {
|
||||
try {
|
||||
for (int i = 0; i < ROWSIZE; i++) {
|
||||
for (int j = 0; j < QUALSIZE; j++) {
|
||||
if (i % 2 == 0) {
|
||||
memstore.add(makeKV(i, j));
|
||||
} else {
|
||||
writers[(i + j) % writers.length].append(makeKV(i, j));
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
for (int i = 0; i < writers.length; i++) {
|
||||
writers[i].close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void writeStoreFile(final StoreFile.Writer writer)
|
||||
throws IOException {
|
||||
try {
|
||||
for (int i = 0; i < ROWSIZE; i++) {
|
||||
for (int j = 0; j < QUALSIZE; j++) {
|
||||
writer.append(makeKV(i, j));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static void writeMemstore(MemStore memstore) throws IOException {
|
||||
// Add half of the keyvalues to memstore
|
||||
for (int i = 0; i < ROWSIZE; i++) {
|
||||
for (int j = 0; j < QUALSIZE; j++) {
|
||||
if ((i + j) % 2 == 0) {
|
||||
memstore.add(makeKV(i, j));
|
||||
}
|
||||
}
|
||||
}
|
||||
memstore.snapshot();
|
||||
// Add another half of the keyvalues to snapshot
|
||||
for (int i = 0; i < ROWSIZE; i++) {
|
||||
for (int j = 0; j < QUALSIZE; j++) {
|
||||
if ((i + j) % 2 == 1) {
|
||||
memstore.add(makeKV(i, j));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static KeyValue makeKV(int rowNum, int cqNum) {
|
||||
return makeKV(rowNum, cqNum, FAMILYNAME);
|
||||
}
|
||||
|
||||
private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
|
||||
KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
|
||||
VALUES[rowNum % VALUESIZE]);
|
||||
kv.setSequenceId(makeMVCC(rowNum, cqNum));
|
||||
return kv;
|
||||
}
|
||||
|
||||
private static long makeMVCC(int rowNum, int cqNum) {
|
||||
return (rowNum + cqNum) % (MAXMVCC + 1);
|
||||
}
|
||||
|
||||
private static byte[][] makeN(byte[] base, int n) {
|
||||
byte[][] ret = new byte[n][];
|
||||
for (int i = 0; i < n; i++) {
|
||||
ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
|
@ -30,6 +30,8 @@ import java.util.TreeSet;
|
|||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
|
@ -47,7 +49,8 @@ import org.junit.experimental.categories.Category;
|
|||
public class TestStoreScanner extends TestCase {
|
||||
private static final String CF_STR = "cf";
|
||||
final byte [] CF = Bytes.toBytes(CF_STR);
|
||||
private ScanInfo scanInfo = new ScanInfo(CF, 0, Integer.MAX_VALUE,
|
||||
static Configuration CONF = HBaseConfiguration.create();
|
||||
private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE,
|
||||
Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
|
||||
private ScanType scanType = ScanType.USER_SCAN;
|
||||
|
||||
|
@ -416,7 +419,7 @@ public class TestStoreScanner extends TestCase {
|
|||
List<KeyValueScanner> scanners = scanFixture(kvs);
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(1);
|
||||
ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
|
||||
ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
|
||||
KeyValue.COMPARATOR);
|
||||
ScanType scanType = ScanType.USER_SCAN;
|
||||
StoreScanner scanner =
|
||||
|
@ -487,7 +490,7 @@ public class TestStoreScanner extends TestCase {
|
|||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(1);
|
||||
// scanner with ttl equal to 500
|
||||
ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
|
||||
ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
|
||||
KeyValue.COMPARATOR);
|
||||
ScanType scanType = ScanType.USER_SCAN;
|
||||
StoreScanner scanner =
|
||||
|
@ -547,7 +550,7 @@ public class TestStoreScanner extends TestCase {
|
|||
List<KeyValueScanner> scanners = scanFixture(kvs);
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(2);
|
||||
ScanInfo scanInfo = new ScanInfo(Bytes.toBytes("cf"),
|
||||
ScanInfo scanInfo = new ScanInfo(CONF, Bytes.toBytes("cf"),
|
||||
0 /* minVersions */,
|
||||
2 /* maxVersions */, 500 /* ttl */,
|
||||
KeepDeletedCells.FALSE /* keepDeletedCells */,
|
||||
|
|
|
@ -187,7 +187,7 @@ public class TestStripeCompactor {
|
|||
|
||||
// Create store mock that is satisfactory for compactor.
|
||||
HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
|
||||
ScanInfo si = new ScanInfo(col, Long.MAX_VALUE, 0, new KVComparator());
|
||||
ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, new KVComparator());
|
||||
Store store = mock(Store.class);
|
||||
when(store.getFamily()).thenReturn(col);
|
||||
when(store.getScanInfo()).thenReturn(si);
|
||||
|
|
|
@ -248,7 +248,8 @@ public class TestCoprocessorScanPolicy {
|
|||
Integer newVersions = versions.get(store.getTableName());
|
||||
ScanInfo oldSI = store.getScanInfo();
|
||||
HColumnDescriptor family = store.getFamily();
|
||||
ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
|
||||
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
|
||||
family.getName(), family.getMinVersions(),
|
||||
newVersions == null ? family.getMaxVersions() : newVersions,
|
||||
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
|
||||
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
|
||||
|
@ -268,7 +269,8 @@ public class TestCoprocessorScanPolicy {
|
|||
Integer newVersions = versions.get(store.getTableName());
|
||||
ScanInfo oldSI = store.getScanInfo();
|
||||
HColumnDescriptor family = store.getFamily();
|
||||
ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
|
||||
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
|
||||
family.getName(), family.getMinVersions(),
|
||||
newVersions == null ? family.getMaxVersions() : newVersions,
|
||||
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
|
||||
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
|
||||
|
@ -288,7 +290,8 @@ public class TestCoprocessorScanPolicy {
|
|||
Integer newVersions = versions.get(store.getTableName());
|
||||
ScanInfo oldSI = store.getScanInfo();
|
||||
HColumnDescriptor family = store.getFamily();
|
||||
ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
|
||||
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
|
||||
family.getName(), family.getMinVersions(),
|
||||
newVersions == null ? family.getMaxVersions() : newVersions,
|
||||
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
|
||||
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
|
||||
|
@ -299,5 +302,4 @@ public class TestCoprocessorScanPolicy {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue