HBASE-24973 Remove read point parameter in method StoreFlush#performFlush and StoreFlush#createScanner (#2337)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
e042cabfb3
commit
0c290bfa0b
|
@ -107,8 +107,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
||||||
if (cellsCount == 0) return result; // don't flush if there are no entries
|
if (cellsCount == 0) return result; // don't flush if there are no entries
|
||||||
|
|
||||||
// Use a store scanner to find which rows to flush.
|
// Use a store scanner to find which rows to flush.
|
||||||
long smallestReadPoint = store.getSmallestReadPoint();
|
InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
|
||||||
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
|
||||||
StoreFileWriter writer;
|
StoreFileWriter writer;
|
||||||
try {
|
try {
|
||||||
// TODO: We can fail in the below block before we complete adding this flush to
|
// TODO: We can fail in the below block before we complete adding this flush to
|
||||||
|
|
|
@ -52,8 +52,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
|
||||||
if (cellsCount == 0) return result; // don't flush if there are no entries
|
if (cellsCount == 0) return result; // don't flush if there are no entries
|
||||||
|
|
||||||
// Use a store scanner to find which rows to flush.
|
// Use a store scanner to find which rows to flush.
|
||||||
long smallestReadPoint = store.getSmallestReadPoint();
|
InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
|
||||||
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
|
||||||
StoreFileWriter writer;
|
StoreFileWriter writer;
|
||||||
try {
|
try {
|
||||||
// TODO: We can fail in the below block before we complete adding this flush to
|
// TODO: We can fail in the below block before we complete adding this flush to
|
||||||
|
@ -66,7 +65,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
|
||||||
snapshot.isTagsPresent(), false);
|
snapshot.isTagsPresent(), false);
|
||||||
IOException e = null;
|
IOException e = null;
|
||||||
try {
|
try {
|
||||||
performFlush(scanner, writer, smallestReadPoint, throughputController);
|
performFlush(scanner, writer, throughputController);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
e = ioe;
|
e = ioe;
|
||||||
// throw the exception out
|
// throw the exception out
|
||||||
|
|
|
@ -73,20 +73,20 @@ abstract class StoreFlusher {
|
||||||
/**
|
/**
|
||||||
* Creates the scanner for flushing snapshot. Also calls coprocessors.
|
* Creates the scanner for flushing snapshot. Also calls coprocessors.
|
||||||
* @param snapshotScanners
|
* @param snapshotScanners
|
||||||
* @param smallestReadPoint
|
|
||||||
* @return The scanner; null if coprocessor is canceling the flush.
|
* @return The scanner; null if coprocessor is canceling the flush.
|
||||||
*/
|
*/
|
||||||
protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
|
protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
|
||||||
long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException {
|
FlushLifeCycleTracker tracker) throws IOException {
|
||||||
ScanInfo scanInfo;
|
ScanInfo scanInfo;
|
||||||
if (store.getCoprocessorHost() != null) {
|
if (store.getCoprocessorHost() != null) {
|
||||||
scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker);
|
scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker);
|
||||||
} else {
|
} else {
|
||||||
scanInfo = store.getScanInfo();
|
scanInfo = store.getScanInfo();
|
||||||
}
|
}
|
||||||
|
final long smallestReadPoint = store.getSmallestReadPoint();
|
||||||
InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners,
|
InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners,
|
||||||
ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
|
ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
|
||||||
assert scanner != null;
|
|
||||||
if (store.getCoprocessorHost() != null) {
|
if (store.getCoprocessorHost() != null) {
|
||||||
try {
|
try {
|
||||||
return store.getCoprocessorHost().preFlush(store, scanner, tracker);
|
return store.getCoprocessorHost().preFlush(store, scanner, tracker);
|
||||||
|
@ -102,11 +102,10 @@ abstract class StoreFlusher {
|
||||||
* Performs memstore flush, writing data from scanner into sink.
|
* Performs memstore flush, writing data from scanner into sink.
|
||||||
* @param scanner Scanner to get data from.
|
* @param scanner Scanner to get data from.
|
||||||
* @param sink Sink to write data to. Could be StoreFile.Writer.
|
* @param sink Sink to write data to. Could be StoreFile.Writer.
|
||||||
* @param smallestReadPoint Smallest read point used for the flush.
|
|
||||||
* @param throughputController A controller to avoid flush too fast
|
* @param throughputController A controller to avoid flush too fast
|
||||||
*/
|
*/
|
||||||
protected void performFlush(InternalScanner scanner, CellSink sink,
|
protected void performFlush(InternalScanner scanner, CellSink sink,
|
||||||
long smallestReadPoint, ThroughputController throughputController) throws IOException {
|
ThroughputController throughputController) throws IOException {
|
||||||
int compactionKVMax =
|
int compactionKVMax =
|
||||||
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
|
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
|
||||||
|
|
||||||
|
|
|
@ -61,8 +61,7 @@ public class StripeStoreFlusher extends StoreFlusher {
|
||||||
int cellsCount = snapshot.getCellsCount();
|
int cellsCount = snapshot.getCellsCount();
|
||||||
if (cellsCount == 0) return result; // don't flush if there are no entries
|
if (cellsCount == 0) return result; // don't flush if there are no entries
|
||||||
|
|
||||||
long smallestReadPoint = store.getSmallestReadPoint();
|
InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
|
||||||
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
|
||||||
|
|
||||||
// Let policy select flush method.
|
// Let policy select flush method.
|
||||||
StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
|
StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
|
||||||
|
@ -77,7 +76,7 @@ public class StripeStoreFlusher extends StoreFlusher {
|
||||||
mw.init(storeScanner, factory);
|
mw.init(storeScanner, factory);
|
||||||
|
|
||||||
synchronized (flushLock) {
|
synchronized (flushLock) {
|
||||||
performFlush(scanner, mw, smallestReadPoint, throughputController);
|
performFlush(scanner, mw, throughputController);
|
||||||
result = mw.commitWriters(cacheFlushSeqNum, false);
|
result = mw.commitWriters(cacheFlushSeqNum, false);
|
||||||
success = true;
|
success = true;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue