HBASE-4195 Possible inconsistency in a memstore read after a reseek, possible performance improvement
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1171746 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
879ab1b8fb
commit
6571376717
|
@ -277,6 +277,8 @@ Release 0.91.0 - Unreleased
|
|||
(Jon Hseih)
|
||||
HBASE-4417 HBaseAdmin.checkHBaseAvailable() doesn't close ZooKeeper connections
|
||||
(Stefan Seelmann)
|
||||
HBASE-4195 Possible inconsistency in a memstore read after a reseek,
|
||||
possible performance improvement (nkeywal)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||
|
|
|
@ -62,10 +62,6 @@ public class MemStore implements HeapSize {
|
|||
"hbase.hregion.memstore.mslab.enabled";
|
||||
private static final boolean USEMSLAB_DEFAULT = false;
|
||||
|
||||
static final String RESEEKMAX_KEY =
|
||||
"hbase.hregion.memstore.reseek.maxkeys";
|
||||
private static final int RESEEKMAX_DEFAULT = 32;
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
// MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the
|
||||
|
@ -96,10 +92,7 @@ public class MemStore implements HeapSize {
|
|||
|
||||
MemStoreLAB allocator;
|
||||
|
||||
// if a reseek has to scan over more than these number of keys, then
|
||||
// it morphs into a seek. A seek does a tree map-search while
|
||||
// reseek does a linear scan.
|
||||
int reseekNumKeys;
|
||||
|
||||
|
||||
/**
|
||||
* Default constructor. Used for tests.
|
||||
|
@ -129,7 +122,6 @@ public class MemStore implements HeapSize {
|
|||
} else {
|
||||
this.allocator = null;
|
||||
}
|
||||
this.reseekNumKeys = conf.getInt(RESEEKMAX_KEY, RESEEKMAX_DEFAULT);
|
||||
}
|
||||
|
||||
void dump() {
|
||||
|
@ -646,12 +638,16 @@ public class MemStore implements HeapSize {
|
|||
private KeyValue snapshotNextRow = null;
|
||||
|
||||
// iterator based scanning.
|
||||
Iterator<KeyValue> kvsetIt;
|
||||
Iterator<KeyValue> snapshotIt;
|
||||
private Iterator<KeyValue> kvsetIt;
|
||||
private Iterator<KeyValue> snapshotIt;
|
||||
|
||||
// Sub lists on which we're iterating
|
||||
private SortedSet<KeyValue> kvTail;
|
||||
private SortedSet<KeyValue> snapshotTail;
|
||||
|
||||
// the pre-calculated KeyValue to be returned by peek() or next()
|
||||
private KeyValue theNext;
|
||||
|
||||
// number of iterations in this reseek operation
|
||||
int numIterReseek;
|
||||
|
||||
/*
|
||||
Some notes...
|
||||
|
||||
|
@ -668,105 +664,115 @@ public class MemStore implements HeapSize {
|
|||
become available but we will never see them. This needs to be handled at the
|
||||
StoreScanner level with coordination with MemStoreScanner.
|
||||
|
||||
Currently, this problem is only partly managed: during the small amount of time
|
||||
when the StoreScanner has not yet created a new MemStoreScanner, we will miss
|
||||
the adds to kvset in the MemStoreScanner.
|
||||
*/
|
||||
|
||||
MemStoreScanner() {
|
||||
super();
|
||||
|
||||
//DebugPrint.println(" MS new@" + hashCode());
|
||||
}
|
||||
|
||||
protected KeyValue getNext(Iterator<KeyValue> it) {
|
||||
KeyValue ret = null;
|
||||
long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
|
||||
//DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
|
||||
|
||||
while (ret == null && it.hasNext()) {
|
||||
while (it.hasNext()) {
|
||||
KeyValue v = it.next();
|
||||
if (v.getMemstoreTS() <= readPoint) {
|
||||
// keep it.
|
||||
ret = v;
|
||||
}
|
||||
numIterReseek--;
|
||||
if (numIterReseek == 0) {
|
||||
break;
|
||||
return v;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the scanner at the seek key.
|
||||
* Must be called only once: there is no thread safety between the scanner
|
||||
* and the memStore.
|
||||
* @param key seek value
|
||||
* @return false if the key is null or if there is no data
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean seek(KeyValue key) {
|
||||
if (key == null) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
numIterReseek = 0;
|
||||
|
||||
// kvset and snapshot will never be empty.
|
||||
// if tailSet cant find anything, SS is empty (not null).
|
||||
SortedSet<KeyValue> kvTail = kvset.tailSet(key);
|
||||
SortedSet<KeyValue> snapshotTail = snapshot.tailSet(key);
|
||||
// kvset and snapshot will never be null.
|
||||
// if tailSet can't find anything, SortedSet is empty (not null).
|
||||
kvTail = kvset.tailSet(key);
|
||||
snapshotTail = snapshot.tailSet(key);
|
||||
|
||||
return seekInSubLists(key);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* (Re)initialize the iterators after a seek or a reseek.
|
||||
*/
|
||||
private synchronized boolean seekInSubLists(KeyValue key){
|
||||
kvsetIt = kvTail.iterator();
|
||||
snapshotIt = snapshotTail.iterator();
|
||||
|
||||
kvsetNextRow = getNext(kvsetIt);
|
||||
snapshotNextRow = getNext(snapshotIt);
|
||||
|
||||
// Calculate the next value
|
||||
theNext = getLowest(kvsetNextRow, snapshotNextRow);
|
||||
|
||||
//long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
|
||||
//DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
|
||||
// kvset.size() + " threadread = " + readPoint);
|
||||
//DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
|
||||
// snapshot.size() + " threadread = " + readPoint);
|
||||
// has data
|
||||
return (theNext != null);
|
||||
}
|
||||
|
||||
|
||||
KeyValue lowest = getLowest();
|
||||
/**
|
||||
* Move forward on the sub-lists set previously by seek.
|
||||
* @param key seek value (should be non-null)
|
||||
* @return true if there is at least one KV to read, false otherwise
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean reseek(KeyValue key) {
|
||||
/*
|
||||
See HBASE-4195 & HBASE-3855 for the background on this implementation.
|
||||
This code is executed concurrently with flush and puts, without locks.
|
||||
Two points must be known when working on this code:
|
||||
1) It's not possible to use the 'kvTail' and 'snapshot'
|
||||
variables, as they are modified during a flush.
|
||||
2) The ideal implementation for performances would use the sub skip list
|
||||
implicitly pointed by the iterators 'kvsetIt' and
|
||||
'snapshotIt'. Unfortunately the Java API does not offer a method to
|
||||
get it. So we're using the skip list that we kept when we created
|
||||
the iterators. As these iterators could have been moved forward after
|
||||
their creation, we're doing a kind of rewind here. It has a small
|
||||
performance impact (we're using a wider list than necessary), and we
|
||||
could see values that were not here when we read the list the first
|
||||
time. We expect that the new values will be skipped by the test on
|
||||
readpoint performed in the next() function.
|
||||
*/
|
||||
|
||||
// has data := (lowest != null)
|
||||
return lowest != null;
|
||||
kvTail = kvTail.tailSet(key);
|
||||
snapshotTail = snapshotTail.tailSet(key);
|
||||
|
||||
return seekInSubLists(key);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized KeyValue peek() {
|
||||
//DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
|
||||
return theNext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean reseek(KeyValue key) {
|
||||
numIterReseek = reseekNumKeys;
|
||||
while (kvsetNextRow != null &&
|
||||
comparator.compare(kvsetNextRow, key) < 0) {
|
||||
kvsetNextRow = getNext(kvsetIt);
|
||||
// if we scanned enough entries but still not able to find the
|
||||
// kv we are looking for, better cut our costs and do a tree
|
||||
// scan using seek.
|
||||
if (kvsetNextRow == null && numIterReseek == 0) {
|
||||
return seek(key);
|
||||
}
|
||||
}
|
||||
|
||||
while (snapshotNextRow != null &&
|
||||
comparator.compare(snapshotNextRow, key) < 0) {
|
||||
snapshotNextRow = getNext(snapshotIt);
|
||||
// if we scanned enough entries but still not able to find the
|
||||
// kv we are looking for, better cut our costs and do a tree
|
||||
// scan using seek.
|
||||
if (snapshotNextRow == null && numIterReseek == 0) {
|
||||
return seek(key);
|
||||
}
|
||||
}
|
||||
return (kvsetNextRow != null || snapshotNextRow != null);
|
||||
}
|
||||
|
||||
public synchronized KeyValue peek() {
|
||||
//DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
|
||||
return getLowest();
|
||||
}
|
||||
|
||||
|
||||
public synchronized KeyValue next() {
|
||||
KeyValue theNext = getLowest();
|
||||
|
||||
if (theNext == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final KeyValue ret = theNext;
|
||||
|
||||
// Advance one of the iterators
|
||||
if (theNext == kvsetNextRow) {
|
||||
kvsetNextRow = getNext(kvsetIt);
|
||||
|
@ -774,15 +780,13 @@ public class MemStore implements HeapSize {
|
|||
snapshotNextRow = getNext(snapshotIt);
|
||||
}
|
||||
|
||||
// Calculate the next value
|
||||
theNext = getLowest(kvsetNextRow, snapshotNextRow);
|
||||
|
||||
//long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
|
||||
//DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
|
||||
// getLowest() + " threadpoint=" + readpoint);
|
||||
return theNext;
|
||||
}
|
||||
|
||||
protected KeyValue getLowest() {
|
||||
return getLower(kvsetNextRow,
|
||||
snapshotNextRow);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -790,7 +794,7 @@ public class MemStore implements HeapSize {
|
|||
* This uses comparator.compare() to compare the KeyValue using the memstore
|
||||
* comparator.
|
||||
*/
|
||||
protected KeyValue getLower(KeyValue first, KeyValue second) {
|
||||
protected KeyValue getLowest(KeyValue first, KeyValue second) {
|
||||
if (first == null && second == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -820,7 +824,7 @@ public class MemStore implements HeapSize {
|
|||
}
|
||||
|
||||
public final static long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT + (12 * ClassSize.REFERENCE));
|
||||
ClassSize.OBJECT + (11 * ClassSize.REFERENCE));
|
||||
|
||||
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
||||
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
|
||||
|
|
|
@ -2665,10 +2665,10 @@ public class TestHRegion extends HBaseTestCase {
|
|||
for (int r = 0; r < numRows; r++) {
|
||||
byte[] row = Bytes.toBytes("row" + r);
|
||||
Put put = new Put(row);
|
||||
byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
|
||||
for (byte[] family : families) {
|
||||
for (byte[] qualifier : qualifiers) {
|
||||
put.add(family, qualifier, (long) numPutsFinished,
|
||||
Bytes.toBytes(numPutsFinished));
|
||||
put.add(family, qualifier, (long) numPutsFinished, value);
|
||||
}
|
||||
}
|
||||
// System.out.println("Putting of kvsetsize=" + put.size());
|
||||
|
@ -2760,14 +2760,22 @@ public class TestHRegion extends HBaseTestCase {
|
|||
}
|
||||
assertTrue(timestamp >= prevTimestamp);
|
||||
prevTimestamp = timestamp;
|
||||
KeyValue previousKV = null;
|
||||
|
||||
byte [] gotValue = null;
|
||||
for (KeyValue kv : result.raw()) {
|
||||
byte [] thisValue = kv.getValue();
|
||||
if (gotValue != null) {
|
||||
assertEquals(gotValue, thisValue);
|
||||
byte[] thisValue = kv.getValue();
|
||||
if (previousKV != null) {
|
||||
if (Bytes.compareTo(previousKV.getValue(), thisValue) != 0) {
|
||||
LOG.warn("These two KV should have the same value." +
|
||||
" Previous KV:" +
|
||||
previousKV + "(memStoreTS:" + previousKV.getMemstoreTS() + ")" +
|
||||
", New KV: " +
|
||||
kv + "(memStoreTS:" + kv.getMemstoreTS() + ")"
|
||||
);
|
||||
assertEquals(previousKV.getValue(), thisValue);
|
||||
}
|
||||
}
|
||||
gotValue = thisValue;
|
||||
previousKV = kv;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue