HBASE-4465 Lazy-seek optimization for StoreFile scanners (mikhail/liyin)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1179442 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
435fea6d79
commit
ead9159ecd
|
@ -11,6 +11,7 @@ Release 0.93.0 - Unreleased
|
|||
HBASE-4477 Ability for an application to store metadata into the
|
||||
transaction log (dhruba via jgray)
|
||||
HBASE-4145 Provide metrics for hbase client (Ming Ma)
|
||||
HBASE-4465 Lazy-seek optimization for StoreFile scanners (mikhail/liyin)
|
||||
|
||||
BUG FIXES
|
||||
HBASE-4488 Store could miss rows during flush (Lars H via jgray)
|
||||
|
|
|
@ -1756,6 +1756,21 @@ public class KeyValue implements Writable, HeapSize {
|
|||
HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the first KV with the row/family/qualifier of this KV and the
|
||||
* given timestamp. Uses the "maximum" KV type that guarantees that the new
|
||||
* KV is the lowest possible for this combination of row, family, qualifier,
|
||||
* and timestamp. This KV's own timestamp is ignored. While this function
|
||||
* copies the value from this KV, it is normally used on key-only KVs.
|
||||
*/
|
||||
public KeyValue createFirstOnRowColTS(long ts) {
|
||||
return new KeyValue(
|
||||
bytes, getRowOffset(), getRowLength(),
|
||||
bytes, getFamilyOffset(), getFamilyLength(),
|
||||
bytes, getQualifierOffset(), getQualifierLength(),
|
||||
ts, Type.Maximum, bytes, getValueOffset(), getValueLength());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param b
|
||||
* @return A KeyValue made of a byte array that holds the key-only part.
|
||||
|
|
|
@ -106,16 +106,4 @@ public class ColumnCount {
|
|||
this.count = count;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check to see if needed to fetch more versions
|
||||
* @param max
|
||||
* @return true if more versions are needed, false otherwise
|
||||
*/
|
||||
public boolean needMore(int max) {
|
||||
if(this.count < max) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,34 +40,23 @@ import java.util.PriorityQueue;
|
|||
* also implements InternalScanner. WARNING: As is, if you try to use this
|
||||
* as an InternalScanner at the Store level, you will get runtime exceptions.
|
||||
*/
|
||||
public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
||||
public class KeyValueHeap extends NonLazyKeyValueScanner
|
||||
implements KeyValueScanner, InternalScanner {
|
||||
private PriorityQueue<KeyValueScanner> heap = null;
|
||||
private KeyValueScanner current = null;
|
||||
private KVScannerComparator comparator;
|
||||
|
||||
/**
|
||||
* A helper enum that knows how to call the correct seek function within a
|
||||
* {@link KeyValueScanner}.
|
||||
* The current sub-scanner, i.e. the one that contains the next key/value
|
||||
* to return to the client. This scanner is NOT included in {@link #heap}
|
||||
* (but we frequently add it back to the heap and pull the new winner out).
|
||||
* We maintain an invariant that the current sub-scanner has already done
|
||||
* a real seek, and that current.peek() is always a real key/value (or null)
|
||||
* except for the fake last-key-on-row-column supplied by the multi-column
|
||||
* Bloom filter optimization, which is OK to propagate to StoreScanner. In
|
||||
* order to ensure that, always use {@link #pollRealKV()} to update current.
|
||||
*/
|
||||
public enum SeekType {
|
||||
NORMAL {
|
||||
@Override
|
||||
public boolean seek(KeyValueScanner scanner, KeyValue kv,
|
||||
boolean forward) throws IOException {
|
||||
return forward ? scanner.reseek(kv) : scanner.seek(kv);
|
||||
}
|
||||
},
|
||||
EXACT {
|
||||
@Override
|
||||
public boolean seek(KeyValueScanner scanner, KeyValue kv,
|
||||
boolean forward) throws IOException {
|
||||
return scanner.seekExactly(kv, forward);
|
||||
}
|
||||
};
|
||||
private KeyValueScanner current = null;
|
||||
|
||||
public abstract boolean seek(KeyValueScanner scanner, KeyValue kv,
|
||||
boolean forward) throws IOException;
|
||||
}
|
||||
private KVScannerComparator comparator;
|
||||
|
||||
/**
|
||||
* Constructor. This KeyValueHeap will handle closing of passed in
|
||||
|
@ -76,7 +65,7 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
* @param comparator
|
||||
*/
|
||||
public KeyValueHeap(List<? extends KeyValueScanner> scanners,
|
||||
KVComparator comparator) {
|
||||
KVComparator comparator) throws IOException {
|
||||
this.comparator = new KVScannerComparator(comparator);
|
||||
if (!scanners.isEmpty()) {
|
||||
this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
|
||||
|
@ -88,7 +77,7 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
scanner.close();
|
||||
}
|
||||
}
|
||||
this.current = heap.poll();
|
||||
this.current = pollRealKV();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,13 +96,13 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
KeyValue kvNext = this.current.peek();
|
||||
if (kvNext == null) {
|
||||
this.current.close();
|
||||
this.current = this.heap.poll();
|
||||
this.current = pollRealKV();
|
||||
} else {
|
||||
KeyValueScanner topScanner = this.heap.peek();
|
||||
if (topScanner == null ||
|
||||
this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
|
||||
this.heap.add(this.current);
|
||||
this.current = this.heap.poll();
|
||||
this.current = pollRealKV();
|
||||
}
|
||||
}
|
||||
return kvReturn;
|
||||
|
@ -149,7 +138,7 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
} else {
|
||||
this.heap.add(this.current);
|
||||
}
|
||||
this.current = this.heap.poll();
|
||||
this.current = pollRealKV();
|
||||
return (this.current != null);
|
||||
}
|
||||
|
||||
|
@ -230,13 +219,20 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
* <p>
|
||||
* As individual scanners may run past their ends, those scanners are
|
||||
* automatically closed and removed from the heap.
|
||||
* <p>
|
||||
* This function (and {@link #reseek(KeyValue)}) does not do multi-column
|
||||
* Bloom filter and lazy-seek optimizations. To enable those, call
|
||||
* {@link #requestSeek(KeyValue, boolean, boolean)}.
|
||||
* @param seekKey KeyValue to seek at or after
|
||||
* @return true if KeyValues exist at or after specified key, false if not
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public boolean seek(KeyValue seekKey) throws IOException {
|
||||
return generalizedSeek(seekKey, SeekType.NORMAL, false);
|
||||
return generalizedSeek(false, // This is not a lazy seek
|
||||
seekKey,
|
||||
false, // forward (false: this is not a reseek)
|
||||
false); // Not using Bloom filters
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -245,20 +241,36 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
*/
|
||||
@Override
|
||||
public boolean reseek(KeyValue seekKey) throws IOException {
|
||||
return generalizedSeek(seekKey, SeekType.NORMAL, true);
|
||||
return generalizedSeek(false, // This is not a lazy seek
|
||||
seekKey,
|
||||
true, // forward (true because this is reseek)
|
||||
false); // Not using Bloom filters
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean seekExactly(KeyValue seekKey, boolean forward)
|
||||
throws IOException {
|
||||
return generalizedSeek(seekKey, SeekType.EXACT, forward);
|
||||
public boolean requestSeek(KeyValue key, boolean forward,
|
||||
boolean useBloom) throws IOException {
|
||||
return generalizedSeek(true, key, forward, useBloom);
|
||||
}
|
||||
|
||||
private boolean generalizedSeek(KeyValue seekKey, SeekType seekType,
|
||||
boolean forward) throws IOException {
|
||||
/**
|
||||
* @param isLazy whether we are trying to seek to exactly the given row/col.
|
||||
* Enables Bloom filter and most-recent-file-first optimizations for
|
||||
* multi-column get/scan queries.
|
||||
* @param seekKey key to seek to
|
||||
* @param forward whether to seek forward (also known as reseek)
|
||||
* @param useBloom whether to optimize seeks using Bloom filters
|
||||
*/
|
||||
private boolean generalizedSeek(boolean isLazy, KeyValue seekKey,
|
||||
boolean forward, boolean useBloom) throws IOException {
|
||||
if (!isLazy && useBloom) {
|
||||
throw new IllegalArgumentException("Multi-column Bloom filter " +
|
||||
"optimization requires a lazy seek");
|
||||
}
|
||||
|
||||
if (current == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -269,12 +281,26 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
while ((scanner = heap.poll()) != null) {
|
||||
KeyValue topKey = scanner.peek();
|
||||
if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
|
||||
// Top KeyValue is at-or-after Seek KeyValue
|
||||
current = scanner;
|
||||
return true;
|
||||
// Top KeyValue is at-or-after Seek KeyValue. We only know that all
|
||||
// scanners are at or after seekKey (because fake keys of
|
||||
// scanners where a lazy-seek operation has been done are not greater
|
||||
// than their real next keys) but we still need to enforce our
|
||||
// invariant that the top scanner has done a real seek. This way
|
||||
// StoreScanner and RegionScanner do not have to worry about fake keys.
|
||||
heap.add(scanner);
|
||||
current = pollRealKV();
|
||||
return current != null;
|
||||
}
|
||||
|
||||
if (!seekType.seek(scanner, seekKey, forward)) {
|
||||
|
||||
boolean seekResult;
|
||||
if (isLazy) {
|
||||
seekResult = scanner.requestSeek(seekKey, forward, useBloom);
|
||||
} else {
|
||||
seekResult = NonLazyKeyValueScanner.doRealSeek(
|
||||
scanner, seekKey, forward);
|
||||
}
|
||||
|
||||
if (!seekResult) {
|
||||
scanner.close();
|
||||
} else {
|
||||
heap.add(scanner);
|
||||
|
@ -285,6 +311,62 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the top sub-scanner from the priority queue, ensuring that a real
|
||||
* seek has been done on it. Works by fetching the top sub-scanner, and if it
|
||||
* has not done a real seek, making it do so (which will modify its top KV),
|
||||
* putting it back, and repeating this until success. Relies on the fact that
|
||||
* on a lazy seek we set the current key of a StoreFileScanner to a KV that
|
||||
* is not greater than the real next KV to be read from that file, so the
|
||||
* scanner that bubbles up to the top of the heap will have global next KV in
|
||||
* this scanner heap if (1) it has done a real seek and (2) its KV is the top
|
||||
* among all top KVs (some of which are fake) in the scanner heap.
|
||||
*/
|
||||
private KeyValueScanner pollRealKV() throws IOException {
|
||||
KeyValueScanner kvScanner = heap.poll();
|
||||
if (kvScanner == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
while (kvScanner != null && !kvScanner.realSeekDone()) {
|
||||
if (kvScanner.peek() != null) {
|
||||
kvScanner.enforceSeek();
|
||||
KeyValue curKV = kvScanner.peek();
|
||||
if (curKV != null) {
|
||||
KeyValueScanner nextEarliestScanner = heap.peek();
|
||||
if (nextEarliestScanner == null) {
|
||||
// The heap is empty. Return the only possible scanner.
|
||||
return kvScanner;
|
||||
}
|
||||
|
||||
// Compare the current scanner to the next scanner. We try to avoid
|
||||
// putting the current one back into the heap if possible.
|
||||
KeyValue nextKV = nextEarliestScanner.peek();
|
||||
if (nextKV == null || comparator.compare(curKV, nextKV) <= 0) {
|
||||
// We already have the scanner with the earliest KV, so return it.
|
||||
return kvScanner;
|
||||
}
|
||||
|
||||
// Otherwise, put the scanner back into the heap and let it compete
|
||||
// against all other scanners (both those that have done a "real
|
||||
// seek" and a "lazy seek").
|
||||
heap.add(kvScanner);
|
||||
} else {
|
||||
// Close the scanner because we did a real seek and found out there
|
||||
// are no more KVs.
|
||||
kvScanner.close();
|
||||
}
|
||||
} else {
|
||||
// Close the scanner because it has already run out of KVs even before
|
||||
// we had to do a real seek on it.
|
||||
kvScanner.close();
|
||||
}
|
||||
kvScanner = heap.poll();
|
||||
}
|
||||
|
||||
return kvScanner;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current Heap
|
||||
*/
|
||||
|
|
|
@ -56,16 +56,6 @@ public interface KeyValueScanner {
|
|||
*/
|
||||
public boolean reseek(KeyValue key) throws IOException;
|
||||
|
||||
/**
|
||||
* Similar to {@link #seek} (or {@link #reseek} if forward is true) but only
|
||||
* does a seek operation after checking that it is really necessary for the
|
||||
* row/column combination specified by the kv parameter. This function was
|
||||
* added to avoid unnecessary disk seeks on multi-column get queries using
|
||||
* Bloom filter checking. Should only be used for queries where the set of
|
||||
* columns is specified exactly.
|
||||
*/
|
||||
public boolean seekExactly(KeyValue kv, boolean forward) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the sequence id associated with this KeyValueScanner. This is required
|
||||
* for comparing multiple files to find out which one has the latest data.
|
||||
|
@ -78,4 +68,37 @@ public interface KeyValueScanner {
|
|||
* Close the KeyValue scanner.
|
||||
*/
|
||||
public void close();
|
||||
|
||||
// "Lazy scanner" optimizations
|
||||
|
||||
/**
|
||||
* Similar to {@link #seek} (or {@link #reseek} if forward is true) but only
|
||||
* does a seek operation after checking that it is really necessary for the
|
||||
* row/column combination specified by the kv parameter. This function was
|
||||
* added to avoid unnecessary disk seeks by checking row-column Bloom filters
|
||||
* before a seek on multi-column get/scan queries, and to optimize by looking
|
||||
* up more recent files first.
|
||||
* @param forward do a forward-only "reseek" instead of a random-access seek
|
||||
* @param useBloom whether to enable multi-column Bloom filter optimization
|
||||
*/
|
||||
public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* We optimize our store scanners by checking the most recent store file
|
||||
* first, so we sometimes pretend we have done a seek but delay it until the
|
||||
* store scanner bubbles up to the top of the key-value heap. This method is
|
||||
* then used to ensure the top store file scanner has done a seek operation.
|
||||
*/
|
||||
public boolean realSeekDone();
|
||||
|
||||
/**
|
||||
* Does the real seek operation in case it was skipped by
|
||||
* {@link #seekToRowCol(KeyValue, boolean)}. Note that this function should
|
||||
* be never called on scanners that always do real seek operations (i.e. most
|
||||
* of the scanners). The easiest way to achieve this is to call
|
||||
* {@link #realSeekDone()} first.
|
||||
*/
|
||||
public void enforceSeek() throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -632,7 +632,7 @@ public class MemStore implements HeapSize {
|
|||
* map and snapshot.
|
||||
* This behaves as if it were a real scanner but does not maintain position.
|
||||
*/
|
||||
protected class MemStoreScanner extends AbstractKeyValueScanner {
|
||||
protected class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||
// Next row information for either kvset or snapshot
|
||||
private KeyValue kvsetNextRow = null;
|
||||
private KeyValue snapshotNextRow = null;
|
||||
|
|
|
@ -21,13 +21,35 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
||||
public abstract class AbstractKeyValueScanner implements KeyValueScanner {
|
||||
/**
|
||||
* A "non-lazy" scanner which always does a real seek operation. Most scanners
|
||||
* are inherited from this class.
|
||||
*/
|
||||
public abstract class NonLazyKeyValueScanner implements KeyValueScanner {
|
||||
|
||||
@Override
|
||||
public boolean seekExactly(KeyValue kv, boolean forward) throws IOException {
|
||||
return forward ? reseek(kv) : seek(kv);
|
||||
public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom)
|
||||
throws IOException {
|
||||
return doRealSeek(this, kv, forward);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean realSeekDone() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enforceSeek() throws IOException {
|
||||
throw new NotImplementedException("enforceSeek must not be called on a " +
|
||||
"non-lazy scanner");
|
||||
}
|
||||
|
||||
public static boolean doRealSeek(KeyValueScanner scanner,
|
||||
KeyValue kv, boolean forward) throws IOException {
|
||||
return forward ? scanner.reseek(kv) : scanner.seek(kv);
|
||||
}
|
||||
|
||||
}
|
|
@ -59,12 +59,6 @@ public class ScanQueryMatcher {
|
|||
|
||||
/** Row the query is on */
|
||||
protected byte [] row;
|
||||
|
||||
/**
|
||||
* True if we are only interested in the given exact set of columns. In that
|
||||
* case we can use Bloom filters to avoid unnecessary disk seeks.
|
||||
*/
|
||||
private boolean exactColumnQuery;
|
||||
|
||||
/**
|
||||
* Constructs a ScanQueryMatcher for a Scan.
|
||||
|
@ -95,7 +89,6 @@ public class ScanQueryMatcher {
|
|||
// between rows, not between storefiles.
|
||||
this.columns = new ExplicitColumnTracker(columns, minVersions, maxVersions,
|
||||
ttl);
|
||||
exactColumnQuery = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -313,10 +306,6 @@ public class ScanQueryMatcher {
|
|||
null, 0, 0);
|
||||
}
|
||||
|
||||
public boolean isExactColumnQuery() {
|
||||
return exactColumnQuery;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link #match} return codes. These instruct the scanner moving through
|
||||
* memstores and StoreFiles what to do with the current KeyValue.
|
||||
|
|
|
@ -1376,6 +1376,10 @@ public class StoreFile {
|
|||
void disableBloomFilterForTesting() {
|
||||
bloomFilter = null;
|
||||
}
|
||||
|
||||
public long getMaxTimestamp() {
|
||||
return timeRangeTracker.maximumTimestamp;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,11 +28,11 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
|||
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* KeyValueScanner adaptor over the Reader. It also provides hooks into
|
||||
|
@ -46,6 +46,10 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
private final HFileScanner hfs;
|
||||
private KeyValue cur = null;
|
||||
|
||||
private boolean realSeekDone;
|
||||
private boolean delayedReseek;
|
||||
private KeyValue delayedSeekKV;
|
||||
|
||||
private static final AtomicLong seekCount = new AtomicLong();
|
||||
|
||||
/**
|
||||
|
@ -99,12 +103,16 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
public boolean seek(KeyValue key) throws IOException {
|
||||
seekCount.incrementAndGet();
|
||||
try {
|
||||
if(!seekAtOrAfter(hfs, key)) {
|
||||
close();
|
||||
return false;
|
||||
try {
|
||||
if(!seekAtOrAfter(hfs, key)) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
cur = hfs.getKeyValue();
|
||||
return true;
|
||||
} finally {
|
||||
realSeekDone = true;
|
||||
}
|
||||
cur = hfs.getKeyValue();
|
||||
return true;
|
||||
} catch(IOException ioe) {
|
||||
throw new IOException("Could not seek " + this, ioe);
|
||||
}
|
||||
|
@ -113,12 +121,16 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
public boolean reseek(KeyValue key) throws IOException {
|
||||
seekCount.incrementAndGet();
|
||||
try {
|
||||
if (!reseekAtOrAfter(hfs, key)) {
|
||||
close();
|
||||
return false;
|
||||
try {
|
||||
if (!reseekAtOrAfter(hfs, key)) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
cur = hfs.getKeyValue();
|
||||
return true;
|
||||
} finally {
|
||||
realSeekDone = true;
|
||||
}
|
||||
cur = hfs.getKeyValue();
|
||||
return true;
|
||||
} catch (IOException ioe) {
|
||||
throw new IOException("Could not seek " + this, ioe);
|
||||
}
|
||||
|
@ -174,27 +186,72 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
return reader.getSequenceID();
|
||||
}
|
||||
|
||||
/**
|
||||
* Pretend we have done a seek but don't do it yet, if possible. The hope is
|
||||
* that we find requested columns in more recent files and won't have to seek
|
||||
* in older files. Creates a fake key/value with the given row/column and the
|
||||
* highest (most recent) possible timestamp we might get from this file. When
|
||||
* users of such "lazy scanner" need to know the next KV precisely (e.g. when
|
||||
* this scanner is at the top of the heap), they run {@link #enforceSeek()}.
|
||||
* <p>
|
||||
* Note that this function does guarantee that the current KV of this scanner
|
||||
* will be advanced to at least the given KV. Because of this, it does have
|
||||
* to do a real seek in cases when the seek timestamp is older than the
|
||||
* highest timestamp of the file, e.g. when we are trying to seek to the next
|
||||
* row/column and use OLDEST_TIMESTAMP in the seek key.
|
||||
*/
|
||||
@Override
|
||||
public boolean seekExactly(KeyValue kv, boolean forward)
|
||||
public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom)
|
||||
throws IOException {
|
||||
if (reader.getBloomFilterType() != StoreFile.BloomType.ROWCOL ||
|
||||
kv.getRowLength() == 0 || kv.getQualifierLength() == 0) {
|
||||
return forward ? reseek(kv) : seek(kv);
|
||||
kv.getFamilyLength() == 0) {
|
||||
useBloom = false;
|
||||
}
|
||||
|
||||
boolean isInBloom = reader.passesBloomFilter(kv.getBuffer(),
|
||||
kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
|
||||
kv.getQualifierOffset(), kv.getQualifierLength());
|
||||
if (isInBloom) {
|
||||
// This row/column might be in this store file. Do a normal seek.
|
||||
return forward ? reseek(kv) : seek(kv);
|
||||
boolean haveToSeek = true;
|
||||
if (useBloom) {
|
||||
haveToSeek = reader.passesBloomFilter(kv.getBuffer(),
|
||||
kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
|
||||
kv.getQualifierOffset(), kv.getQualifierLength());
|
||||
}
|
||||
|
||||
delayedReseek = forward;
|
||||
delayedSeekKV = kv;
|
||||
|
||||
if (haveToSeek) {
|
||||
// This row/column might be in this store file (or we did not use the
|
||||
// Bloom filter), so we still need to seek.
|
||||
realSeekDone = false;
|
||||
long maxTimestampInFile = reader.getMaxTimestamp();
|
||||
long seekTimestamp = kv.getTimestamp();
|
||||
if (seekTimestamp > maxTimestampInFile) {
|
||||
// Create a fake key that is not greater than the real next key.
|
||||
// (Lower timestamps correspond to higher KVs.)
|
||||
// To understand this better, consider that we are asked to seek to
|
||||
// a higher timestamp than the max timestamp in this file. We know that
|
||||
// the next point when we have to consider this file again is when we
|
||||
// pass the max timestamp of this file (with the same row/column).
|
||||
cur = kv.createFirstOnRowColTS(maxTimestampInFile);
|
||||
} else {
|
||||
// This will be the case e.g. when we need to seek to the next
|
||||
// row/column, and we don't know exactly what they are, so we set the
|
||||
// seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
|
||||
// row/column.
|
||||
enforceSeek();
|
||||
}
|
||||
return cur != null;
|
||||
}
|
||||
|
||||
// Multi-column Bloom filter optimization.
|
||||
// Create a fake key/value, so that this scanner only bubbles up to the top
|
||||
// of the KeyValueHeap in StoreScanner after we scanned this row/column in
|
||||
// all other store files. The query matcher will then just skip this fake
|
||||
// key/value and the store scanner will progress to the next column.
|
||||
// key/value and the store scanner will progress to the next column. This
|
||||
// is obviously not a "real real" seek, but unlike the fake KV earlier in
|
||||
// this method, we want this to be propagated to ScanQueryMatcher.
|
||||
cur = kv.createLastOnRowCol();
|
||||
|
||||
realSeekDone = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -202,6 +259,23 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
return reader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean realSeekDone() {
|
||||
return realSeekDone;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enforceSeek() throws IOException {
|
||||
if (realSeekDone)
|
||||
return;
|
||||
|
||||
if (delayedReseek) {
|
||||
reseek(delayedSeekKV);
|
||||
} else {
|
||||
seek(delayedSeekKV);
|
||||
}
|
||||
}
|
||||
|
||||
// Test methods
|
||||
|
||||
static final long getSeekCount() {
|
||||
|
|
|
@ -36,7 +36,8 @@ import java.util.NavigableSet;
|
|||
* Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
|
||||
* into List<KeyValue> for a single row.
|
||||
*/
|
||||
class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
|
||||
class StoreScanner extends NonLazyKeyValueScanner
|
||||
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
|
||||
static final Log LOG = LogFactory.getLog(StoreScanner.class);
|
||||
private Store store;
|
||||
private ScanQueryMatcher matcher;
|
||||
|
@ -47,6 +48,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
// Doesnt need to be volatile because it's always accessed via synchronized methods
|
||||
private boolean closing = false;
|
||||
private final boolean isGet;
|
||||
private final boolean explicitColumnQuery;
|
||||
private final boolean useRowColBloom;
|
||||
|
||||
/** We don't ever expect to change this, the constant is just for clarity. */
|
||||
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
|
||||
|
@ -58,6 +61,22 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
// if heap == null and lastTop != null, you need to reseek given the key below
|
||||
private KeyValue lastTop = null;
|
||||
|
||||
/** An internal constructor. */
|
||||
private StoreScanner(Store store, boolean cacheBlocks, Scan scan,
|
||||
final NavigableSet<byte[]> columns){
|
||||
this.store = store;
|
||||
this.cacheBlocks = cacheBlocks;
|
||||
isGet = scan.isGetScan();
|
||||
int numCol = columns == null ? 0 : columns.size();
|
||||
explicitColumnQuery = numCol > 0;
|
||||
|
||||
// We look up row-column Bloom filters for multi-column queries as part of
|
||||
// the seek operation. However, we also look the row-column Bloom filter
|
||||
// for multi-row (non-"get") scans because this is not done in
|
||||
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
|
||||
useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens a scanner across memstore, snapshot, and all StoreFiles.
|
||||
*
|
||||
|
@ -68,25 +87,25 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
*/
|
||||
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
|
||||
throws IOException {
|
||||
this.store = store;
|
||||
this.cacheBlocks = scan.getCacheBlocks();
|
||||
this(store, scan.getCacheBlocks(), scan, columns);
|
||||
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
|
||||
columns, store.ttl, store.comparator.getRawComparator(),
|
||||
store.minVersions, store.versionsToReturn(scan.getMaxVersions()),
|
||||
false);
|
||||
|
||||
this.isGet = scan.isGetScan();
|
||||
// pass columns = try to filter out unnecessary ScanFiles
|
||||
// Pass columns to try to filter out unnecessary StoreFiles.
|
||||
List<KeyValueScanner> scanners = getScanners(scan, columns);
|
||||
|
||||
// Seek all scanners to the start of the Row (or if the exact matching row
|
||||
// key does not exist, then to the start of the next matching Row).
|
||||
if (matcher.isExactColumnQuery()) {
|
||||
for (KeyValueScanner scanner : scanners)
|
||||
scanner.seekExactly(matcher.getStartKey(), false);
|
||||
if (explicitColumnQuery && lazySeekEnabledGlobally) {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
scanner.requestSeek(matcher.getStartKey(), false, useRowColBloom);
|
||||
}
|
||||
} else {
|
||||
for (KeyValueScanner scanner : scanners)
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
scanner.seek(matcher.getStartKey());
|
||||
}
|
||||
}
|
||||
|
||||
// Combine all seeked scanners with a heap
|
||||
|
@ -104,11 +123,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
* @param scanners ancilliary scanners
|
||||
*/
|
||||
StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
|
||||
boolean retainDeletesInOutput)
|
||||
throws IOException {
|
||||
this.store = store;
|
||||
this.cacheBlocks = false;
|
||||
this.isGet = false;
|
||||
boolean retainDeletesInOutput) throws IOException {
|
||||
this(store, false, scan, null);
|
||||
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
|
||||
null, store.ttl, store.comparator.getRawComparator(), store.minVersions,
|
||||
store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
|
||||
|
@ -128,9 +144,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
final NavigableSet<byte[]> columns,
|
||||
final List<KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
this.store = null;
|
||||
this.isGet = false;
|
||||
this.cacheBlocks = scan.getCacheBlocks();
|
||||
this(null, scan.getCacheBlocks(), scan, columns);
|
||||
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
|
||||
comparator.getRawComparator(), 0, scan.getMaxVersions(), false);
|
||||
|
||||
|
@ -238,7 +252,6 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
* @return true if there are more rows, false if scanner is done
|
||||
*/
|
||||
public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
|
||||
//DebugPrint.println("SS.next");
|
||||
|
||||
checkReseek();
|
||||
|
||||
|
@ -280,7 +293,6 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
}
|
||||
prevKV = copyKv;
|
||||
ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv);
|
||||
|
||||
switch(qcode) {
|
||||
case INCLUDE:
|
||||
case INCLUDE_AND_SEEK_NEXT_ROW:
|
||||
|
@ -319,8 +331,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
return false;
|
||||
|
||||
case SEEK_NEXT_ROW:
|
||||
// This is just a relatively simple end of scan fix, to short-cut end us if there is a
|
||||
// endKey in the scan.
|
||||
// This is just a relatively simple end of scan fix, to short-cut end
|
||||
// us if there is an endKey in the scan.
|
||||
if (!matcher.moreRowsMayExistAfter(kv)) {
|
||||
outResult.addAll(results);
|
||||
return false;
|
||||
|
@ -431,8 +443,11 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
public synchronized boolean reseek(KeyValue kv) throws IOException {
|
||||
//Heap cannot be null, because this is only called from next() which
|
||||
//guarantees that heap will never be null before this call.
|
||||
return matcher.isExactColumnQuery() ? heap.seekExactly(kv, true) :
|
||||
heap.reseek(kv);
|
||||
if (explicitColumnQuery && lazySeekEnabledGlobally) {
|
||||
return heap.requestSeek(kv, true, useRowColBloom);
|
||||
} else {
|
||||
return heap.reseek(kv);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -440,11 +455,6 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seekExactly(KeyValue kv, boolean forward) throws IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in testing.
|
||||
* @return all scanners in no particular order
|
||||
|
@ -464,3 +474,4 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -26,14 +26,13 @@ import java.util.List;
|
|||
import java.util.SortedSet;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.regionserver.AbstractKeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.NonLazyKeyValueScanner;
|
||||
|
||||
/**
|
||||
* Utility scanner that wraps a sortable collection and serves
|
||||
* as a KeyValueScanner.
|
||||
*/
|
||||
public class CollectionBackedScanner extends AbstractKeyValueScanner {
|
||||
public class CollectionBackedScanner extends NonLazyKeyValueScanner {
|
||||
final private Iterable<KeyValue> data;
|
||||
final KeyValue.KVComparator comparator;
|
||||
private Iterator<KeyValue> iter;
|
||||
|
|
|
@ -253,8 +253,8 @@ public class TestBlocksRead extends HBaseTestCase {
|
|||
assertEquals(1, kvs.length);
|
||||
verifyData(kvs[0], "row", "col1", 3);
|
||||
|
||||
// Baseline expected blocks read: 4
|
||||
kvs = getData(FAMILY, "row", Arrays.asList("col1", "col2"), 4);
|
||||
// Expected block reads: 3
|
||||
kvs = getData(FAMILY, "row", Arrays.asList("col1", "col2"), 3);
|
||||
assertEquals(2, kvs.length);
|
||||
verifyData(kvs[0], "row", "col1", 3);
|
||||
verifyData(kvs[1], "row", "col2", 4);
|
||||
|
@ -263,8 +263,8 @@ public class TestBlocksRead extends HBaseTestCase {
|
|||
putData(FAMILY, "row", "col3", 5);
|
||||
region.flushcache();
|
||||
|
||||
// Baseline expected blocks read: 5
|
||||
kvs = getData(FAMILY, "row", "col3", 5);
|
||||
// Baseline expected blocks read: 3
|
||||
kvs = getData(FAMILY, "row", "col3", 3);
|
||||
assertEquals(1, kvs.length);
|
||||
verifyData(kvs[0], "row", "col3", 5);
|
||||
|
||||
|
@ -309,8 +309,8 @@ public class TestBlocksRead extends HBaseTestCase {
|
|||
putData(FAMILY, "row", "col3", 13);
|
||||
region.flushcache();
|
||||
|
||||
// Baseline expected blocks read: 13
|
||||
kvs = getData(FAMILY, "row", Arrays.asList("col1", "col2", "col3"), 13);
|
||||
// Baseline expected blocks read: 9
|
||||
kvs = getData(FAMILY, "row", Arrays.asList("col1", "col2", "col3"), 9);
|
||||
assertEquals(3, kvs.length);
|
||||
verifyData(kvs[0], "row", "col1", 11);
|
||||
verifyData(kvs[1], "row", "col2", 12);
|
||||
|
|
|
@ -533,6 +533,7 @@ public class TestMemStore extends TestCase {
|
|||
* @throws InterruptedException
|
||||
*/
|
||||
public void testGetNextRow() throws Exception {
|
||||
ReadWriteConsistencyControl.resetThreadReadPoint();
|
||||
addRows(this.memstore);
|
||||
// Add more versions to make it a little more interesting.
|
||||
Thread.sleep(1);
|
||||
|
|
Loading…
Reference in New Issue