diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index f2ef873b7a0..77bf8d0b0e3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -202,6 +202,9 @@ public final class HConstants { /** Parameter name for how often a region should should perform a major compaction */ public static final String MAJOR_COMPACTION_PERIOD = "hbase.hregion.majorcompaction"; + /** Parameter name for the maximum batch of KVs to be used in flushes and compactions */ + public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max"; + /** Parameter name for HBase instance root directory */ public static final String HBASE_DIR = "hbase.rootdir"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java index b606458e6e1..0847d9f7808 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java @@ -110,7 +110,7 @@ class Compactor extends Configured { .getScannersForStoreFiles(filesToCompact, false, false, true); // Get some configs - int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10); + int compactionKVMax = getConf().getInt(HConstants.COMPACTION_KV_MAX, 10); Compression.Algorithm compression = store.getFamily().getCompression(); // Avoid overriding compression setting for major compactions if the user // has not specified it separately diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 6c5fdf6f5d4..9cad04e95b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -674,6 +674,10 @@ public class MemStore implements HeapSize { private KeyValue kvsetNextRow = null; private KeyValue snapshotNextRow = null; + // last iterated KVs for kvset and snapshot (to restore iterator state after reseek) + private KeyValue kvsetItRow = null; + private KeyValue snapshotItRow = null; + // iterator based scanning. private Iterator kvsetIt; private Iterator snapshotIt; @@ -682,10 +686,6 @@ public class MemStore implements HeapSize { private KeyValueSkipListSet kvsetAtCreation; private KeyValueSkipListSet snapshotAtCreation; - // Sub lists on which we're iterating - private SortedSet kvTail; - private SortedSet snapshotTail; - // the pre-calculated KeyValue to be returned by peek() or next() private KeyValue theNext; @@ -717,17 +717,29 @@ public class MemStore implements HeapSize { snapshotAtCreation = snapshot; } - protected KeyValue getNext(Iterator it) { + private KeyValue getNext(Iterator it) { long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); - while (it.hasNext()) { - KeyValue v = it.next(); - if (v.getMemstoreTS() <= readPoint) { - return v; + KeyValue v = null; + try { + while (it.hasNext()) { + v = it.next(); + if (v.getMemstoreTS() <= readPoint) { + return v; + } + } + + return null; + } finally { + if (v != null) { + // in all cases, remember the last KV iterated to + if (it == snapshotIt) { + snapshotItRow = v; + } else { + kvsetItRow = v; + } } } - - return null; } /** @@ -746,8 +758,10 @@ public class MemStore implements HeapSize { // kvset and snapshot will never be null. // if tailSet can't find anything, SortedSet is empty (not null). - kvTail = kvsetAtCreation.tailSet(key); - snapshotTail = snapshotAtCreation.tailSet(key); + kvsetIt = kvsetAtCreation.tailSet(key).iterator(); + snapshotIt = snapshotAtCreation.tailSet(key).iterator(); + kvsetItRow = null; + snapshotItRow = null; return seekInSubLists(key); } @@ -757,9 +771,6 @@ public class MemStore implements HeapSize { * (Re)initialize the iterators after a seek or a reseek. */ private synchronized boolean seekInSubLists(KeyValue key){ - kvsetIt = kvTail.iterator(); - snapshotIt = snapshotTail.iterator(); - kvsetNextRow = getNext(kvsetIt); snapshotNextRow = getNext(snapshotIt); @@ -779,25 +790,20 @@ public class MemStore implements HeapSize { @Override public synchronized boolean reseek(KeyValue key) { /* - See HBASE-4195 & HBASE-3855 for the background on this implementation. + See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. This code is executed concurrently with flush and puts, without locks. Two points must be known when working on this code: 1) It's not possible to use the 'kvTail' and 'snapshot' variables, as they are modified during a flush. - 2) The ideal implementation for performances would use the sub skip list + 2) The ideal implementation for performance would use the sub skip list implicitly pointed by the iterators 'kvsetIt' and 'snapshotIt'. Unfortunately the Java API does not offer a method to - get it. So we're using the skip list that we kept when we created - the iterators. As these iterators could have been moved forward after - their creation, we're doing a kind of rewind here. It has a small - performance impact (we're using a wider list than necessary), and we - could see values that were not here when we read the list the first - time. We expect that the new values will be skipped by the test on - readpoint performed in the next() function. + get it. So we remember the last keys we iterated to and restore + the reseeked set to at least that point. */ - kvTail = kvTail.tailSet(key); - snapshotTail = snapshotTail.tailSet(key); + kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator(); + snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); return seekInSubLists(key); } @@ -838,7 +844,7 @@ public class MemStore implements HeapSize { * This uses comparator.compare() to compare the KeyValue using the memstore * comparator. */ - protected KeyValue getLowest(KeyValue first, KeyValue second) { + private KeyValue getLowest(KeyValue first, KeyValue second) { if (first == null && second == null) { return null; } @@ -849,12 +855,31 @@ public class MemStore implements HeapSize { return (first != null ? first : second); } + /* + * Returns the higher of the two key values, or null if they are both null. + * This uses comparator.compare() to compare the KeyValue using the memstore + * comparator. + */ + private KeyValue getHighest(KeyValue first, KeyValue second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = comparator.compare(first, second); + return (compare > 0 ? first : second); + } + return (first != null ? first : second); + } + public synchronized void close() { this.kvsetNextRow = null; this.snapshotNextRow = null; this.kvsetIt = null; this.snapshotIt = null; + + this.kvsetItRow = null; + this.snapshotItRow = null; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index caf909d0061..e888d16417f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -705,9 +705,9 @@ public class Store extends SchemaConfigured implements HStore { if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); - scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner( - set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + scanner = new StoreScanner(this, scanInfo, scan, + Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT, + this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } if (getHRegion().getCoprocessorHost() != null) { InternalScanner cpScanner = @@ -719,6 +719,7 @@ public class Store extends SchemaConfigured implements HStore { scanner = cpScanner; } try { + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10); // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem // if we fail. @@ -732,7 +733,7 @@ public class Store extends SchemaConfigured implements HStore { List kvs = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(kvs); + hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { // If we know that this KV is going to be included always, then let us