HBASE-18220 Compaction scanners need not reopen storefile scanners while
trying to switch over from pread to stream (Ram)
This commit is contained in:
parent
dd1d81ef5a
commit
020f520d17
|
@ -168,7 +168,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
|
|
||||||
/** An internal constructor. */
|
/** An internal constructor. */
|
||||||
protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
|
protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
|
||||||
final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
|
final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks, ScanType scanType) {
|
||||||
this.readPt = readPt;
|
this.readPt = readPt;
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.cacheBlocks = cacheBlocks;
|
this.cacheBlocks = cacheBlocks;
|
||||||
|
@ -198,7 +198,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
// Always start with pread unless user specific stream. Will change to stream later if
|
// Always start with pread unless user specific stream. Will change to stream later if
|
||||||
// readType is default if the scan keeps running for a long time.
|
// readType is default if the scan keeps running for a long time.
|
||||||
this.scanUsePread = this.readType != Scan.ReadType.STREAM;
|
if (scanType != ScanType.COMPACT_DROP_DELETES
|
||||||
|
&& scanType != ScanType.COMPACT_RETAIN_DELETES) {
|
||||||
|
// For compaction scanners never use Pread as already we have stream based scanners on the
|
||||||
|
// store files to be compacted
|
||||||
|
this.scanUsePread = this.readType != Scan.ReadType.STREAM;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
this.preadMaxBytes = scanInfo.getPreadMaxBytes();
|
this.preadMaxBytes = scanInfo.getPreadMaxBytes();
|
||||||
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
|
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
|
||||||
|
@ -228,7 +233,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
|
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
|
||||||
long readPt)
|
long readPt)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
|
this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks(), ScanType.USER_SCAN);
|
||||||
if (columns != null && scan.isRaw()) {
|
if (columns != null && scan.isRaw()) {
|
||||||
throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
|
throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
|
||||||
}
|
}
|
||||||
|
@ -302,7 +307,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
|
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
|
||||||
long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
|
long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
|
||||||
this(store, scan, scanInfo, null,
|
this(store, scan, scanInfo, null,
|
||||||
((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false);
|
((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
|
||||||
if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0)
|
if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0)
|
||||||
|| (scan.getStopRow() != null && scan.getStopRow().length > 0)
|
|| (scan.getStopRow() != null && scan.getStopRow().length > 0)
|
||||||
|| !scan.getTimeRange().isAllTime()) {
|
|| !scan.getTimeRange().isAllTime()) {
|
||||||
|
@ -351,7 +356,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs,
|
final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs,
|
||||||
long readPt) throws IOException {
|
long readPt) throws IOException {
|
||||||
this(null, scan, scanInfo, columns, readPt,
|
this(null, scan, scanInfo, columns, readPt,
|
||||||
scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false);
|
scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false, scanType);
|
||||||
if (scanType == ScanType.USER_SCAN) {
|
if (scanType == ScanType.USER_SCAN) {
|
||||||
this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
|
this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
|
||||||
null);
|
null);
|
||||||
|
@ -385,6 +390,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
|
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean isScanUsePread() {
|
||||||
|
return this.scanUsePread;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Seek the specified scanners with the given key
|
* Seek the specified scanners with the given key
|
||||||
* @param scanners
|
* @param scanners
|
||||||
|
|
|
@ -992,4 +992,20 @@ public class TestStoreScanner {
|
||||||
EnvironmentEdgeManagerTestHelper.reset();
|
EnvironmentEdgeManagerTestHelper.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreadNotEnabledForCompactionStoreScanners() throws Exception {
|
||||||
|
ScanType scanType = ScanType.COMPACT_RETAIN_DELETES;
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
KeyValue[] kvs = new KeyValue[] {
|
||||||
|
new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, now - 1000,
|
||||||
|
KeyValue.Type.DeleteFamily),
|
||||||
|
KeyValueTestUtil.create("R1", "cf", "a", now - 10, KeyValue.Type.Put, "dont-care"), };
|
||||||
|
List<KeyValueScanner> scanners = scanFixture(kvs);
|
||||||
|
Scan scan = new Scan();
|
||||||
|
ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
|
||||||
|
HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
|
||||||
|
StoreScanner storeScanner = new StoreScanner(scan, scanInfo, scanType, null, scanners);
|
||||||
|
assertFalse(storeScanner.isScanUsePread());
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue