HBASE-6561 Gets/Puts with many columns send the RegionServer into an 'endless' loop
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1373943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ea61bc702b
commit
001c445d75
|
@ -202,6 +202,9 @@ public final class HConstants {
|
|||
/** Parameter name for how often a region should should perform a major compaction */
|
||||
public static final String MAJOR_COMPACTION_PERIOD = "hbase.hregion.majorcompaction";
|
||||
|
||||
/** Parameter name for the maximum batch of KVs to be used in flushes and compactions */
|
||||
public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max";
|
||||
|
||||
/** Parameter name for HBase instance root directory */
|
||||
public static final String HBASE_DIR = "hbase.rootdir";
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ class Compactor extends Configured {
|
|||
.getScannersForStoreFiles(filesToCompact, false, false, true);
|
||||
|
||||
// Get some configs
|
||||
int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
|
||||
int compactionKVMax = getConf().getInt(HConstants.COMPACTION_KV_MAX, 10);
|
||||
Compression.Algorithm compression = store.getFamily().getCompression();
|
||||
// Avoid overriding compression setting for major compactions if the user
|
||||
// has not specified it separately
|
||||
|
|
|
@ -674,6 +674,10 @@ public class MemStore implements HeapSize {
|
|||
private KeyValue kvsetNextRow = null;
|
||||
private KeyValue snapshotNextRow = null;
|
||||
|
||||
// last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
|
||||
private KeyValue kvsetItRow = null;
|
||||
private KeyValue snapshotItRow = null;
|
||||
|
||||
// iterator based scanning.
|
||||
private Iterator<KeyValue> kvsetIt;
|
||||
private Iterator<KeyValue> snapshotIt;
|
||||
|
@ -682,10 +686,6 @@ public class MemStore implements HeapSize {
|
|||
private KeyValueSkipListSet kvsetAtCreation;
|
||||
private KeyValueSkipListSet snapshotAtCreation;
|
||||
|
||||
// Sub lists on which we're iterating
|
||||
private SortedSet<KeyValue> kvTail;
|
||||
private SortedSet<KeyValue> snapshotTail;
|
||||
|
||||
// the pre-calculated KeyValue to be returned by peek() or next()
|
||||
private KeyValue theNext;
|
||||
|
||||
|
@ -717,17 +717,29 @@ public class MemStore implements HeapSize {
|
|||
snapshotAtCreation = snapshot;
|
||||
}
|
||||
|
||||
protected KeyValue getNext(Iterator<KeyValue> it) {
|
||||
private KeyValue getNext(Iterator<KeyValue> it) {
|
||||
long readPoint = MultiVersionConsistencyControl.getThreadReadPoint();
|
||||
|
||||
KeyValue v = null;
|
||||
try {
|
||||
while (it.hasNext()) {
|
||||
KeyValue v = it.next();
|
||||
v = it.next();
|
||||
if (v.getMemstoreTS() <= readPoint) {
|
||||
return v;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
} finally {
|
||||
if (v != null) {
|
||||
// in all cases, remember the last KV iterated to
|
||||
if (it == snapshotIt) {
|
||||
snapshotItRow = v;
|
||||
} else {
|
||||
kvsetItRow = v;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -746,8 +758,10 @@ public class MemStore implements HeapSize {
|
|||
|
||||
// kvset and snapshot will never be null.
|
||||
// if tailSet can't find anything, SortedSet is empty (not null).
|
||||
kvTail = kvsetAtCreation.tailSet(key);
|
||||
snapshotTail = snapshotAtCreation.tailSet(key);
|
||||
kvsetIt = kvsetAtCreation.tailSet(key).iterator();
|
||||
snapshotIt = snapshotAtCreation.tailSet(key).iterator();
|
||||
kvsetItRow = null;
|
||||
snapshotItRow = null;
|
||||
|
||||
return seekInSubLists(key);
|
||||
}
|
||||
|
@ -757,9 +771,6 @@ public class MemStore implements HeapSize {
|
|||
* (Re)initialize the iterators after a seek or a reseek.
|
||||
*/
|
||||
private synchronized boolean seekInSubLists(KeyValue key){
|
||||
kvsetIt = kvTail.iterator();
|
||||
snapshotIt = snapshotTail.iterator();
|
||||
|
||||
kvsetNextRow = getNext(kvsetIt);
|
||||
snapshotNextRow = getNext(snapshotIt);
|
||||
|
||||
|
@ -779,25 +790,20 @@ public class MemStore implements HeapSize {
|
|||
@Override
|
||||
public synchronized boolean reseek(KeyValue key) {
|
||||
/*
|
||||
See HBASE-4195 & HBASE-3855 for the background on this implementation.
|
||||
See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
|
||||
This code is executed concurrently with flush and puts, without locks.
|
||||
Two points must be known when working on this code:
|
||||
1) It's not possible to use the 'kvTail' and 'snapshot'
|
||||
variables, as they are modified during a flush.
|
||||
2) The ideal implementation for performances would use the sub skip list
|
||||
2) The ideal implementation for performance would use the sub skip list
|
||||
implicitly pointed by the iterators 'kvsetIt' and
|
||||
'snapshotIt'. Unfortunately the Java API does not offer a method to
|
||||
get it. So we're using the skip list that we kept when we created
|
||||
the iterators. As these iterators could have been moved forward after
|
||||
their creation, we're doing a kind of rewind here. It has a small
|
||||
performance impact (we're using a wider list than necessary), and we
|
||||
could see values that were not here when we read the list the first
|
||||
time. We expect that the new values will be skipped by the test on
|
||||
readpoint performed in the next() function.
|
||||
get it. So we remember the last keys we iterated to and restore
|
||||
the reseeked set to at least that point.
|
||||
*/
|
||||
|
||||
kvTail = kvTail.tailSet(key);
|
||||
snapshotTail = snapshotTail.tailSet(key);
|
||||
kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
|
||||
snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
|
||||
|
||||
return seekInSubLists(key);
|
||||
}
|
||||
|
@ -838,7 +844,7 @@ public class MemStore implements HeapSize {
|
|||
* This uses comparator.compare() to compare the KeyValue using the memstore
|
||||
* comparator.
|
||||
*/
|
||||
protected KeyValue getLowest(KeyValue first, KeyValue second) {
|
||||
private KeyValue getLowest(KeyValue first, KeyValue second) {
|
||||
if (first == null && second == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -849,12 +855,31 @@ public class MemStore implements HeapSize {
|
|||
return (first != null ? first : second);
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns the higher of the two key values, or null if they are both null.
|
||||
* This uses comparator.compare() to compare the KeyValue using the memstore
|
||||
* comparator.
|
||||
*/
|
||||
private KeyValue getHighest(KeyValue first, KeyValue second) {
|
||||
if (first == null && second == null) {
|
||||
return null;
|
||||
}
|
||||
if (first != null && second != null) {
|
||||
int compare = comparator.compare(first, second);
|
||||
return (compare > 0 ? first : second);
|
||||
}
|
||||
return (first != null ? first : second);
|
||||
}
|
||||
|
||||
public synchronized void close() {
|
||||
this.kvsetNextRow = null;
|
||||
this.snapshotNextRow = null;
|
||||
|
||||
this.kvsetIt = null;
|
||||
this.snapshotIt = null;
|
||||
|
||||
this.kvsetItRow = null;
|
||||
this.snapshotItRow = null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -705,9 +705,9 @@ public class Store extends SchemaConfigured implements HStore {
|
|||
if (scanner == null) {
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(scanInfo.getMaxVersions());
|
||||
scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner(
|
||||
set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
|
||||
HConstants.OLDEST_TIMESTAMP);
|
||||
scanner = new StoreScanner(this, scanInfo, scan,
|
||||
Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT,
|
||||
this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
|
||||
}
|
||||
if (getHRegion().getCoprocessorHost() != null) {
|
||||
InternalScanner cpScanner =
|
||||
|
@ -719,6 +719,7 @@ public class Store extends SchemaConfigured implements HStore {
|
|||
scanner = cpScanner;
|
||||
}
|
||||
try {
|
||||
int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
|
||||
// TODO: We can fail in the below block before we complete adding this
|
||||
// flush to list of store files. Add cleanup of anything put on filesystem
|
||||
// if we fail.
|
||||
|
@ -732,7 +733,7 @@ public class Store extends SchemaConfigured implements HStore {
|
|||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
boolean hasMore;
|
||||
do {
|
||||
hasMore = scanner.next(kvs);
|
||||
hasMore = scanner.next(kvs, compactionKVMax);
|
||||
if (!kvs.isEmpty()) {
|
||||
for (KeyValue kv : kvs) {
|
||||
// If we know that this KV is going to be included always, then let us
|
||||
|
|
Loading…
Reference in New Issue