From cb4fac1d18660094908afd6abaace6e441ec6a49 Mon Sep 17 00:00:00 2001 From: anastas Date: Mon, 27 Mar 2017 15:41:32 +0300 Subject: [PATCH] HBASE-17765 Reviving the merge of the compacting pipeline making the limit on the number of the segments in the pipeline configurable, adding merge test, fixing bug in sizes counting --- .../regionserver/CompactingMemStore.java | 9 +- .../regionserver/CompactionPipeline.java | 8 +- .../hbase/regionserver/ImmutableSegment.java | 4 +- .../hbase/regionserver/MemStoreCompactor.java | 21 +++-- .../TestWalAndCompactingMemStoreFlush.java | 83 +++++++++++-------- 5 files changed, 75 insertions(+), 50 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 26b2f49caa4..0c56693be07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -279,7 +279,8 @@ public class CompactingMemStore extends AbstractMemStore { public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, boolean merge) { - return pipeline.swap(versionedList, result, !merge); + // last true stands for updating the region size + return pipeline.swap(versionedList, result, !merge, true); } /** @@ -437,7 +438,8 @@ public class CompactingMemStore extends AbstractMemStore { private void pushTailToSnapshot() { VersionedSegmentsList segments = pipeline.getVersionedTail(); pushToSnapshot(segments.getStoreSegments()); - pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now + // In Swap: don't close segments (they are in snapshot now) and don't update the region size + pipeline.swap(segments,null,false, false); } private void pushPipelineToSnapshot() { @@ -449,7 +451,8 @@ public class CompactingMemStore extends AbstractMemStore { pushToSnapshot(segments.getStoreSegments()); // swap can return false in case the pipeline was updated by ongoing compaction // and the version increase, the chance of it happenning is very low - done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now + // In Swap: don't close segments (they are in snapshot now) and don't update the region size + done = pipeline.swap(segments, null, false, false); if (iterationsCnt>2) { // practically it is impossible that this loop iterates more than two times // (because the compaction is stopped and none restarts it while in snapshot request), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index e64c0fb113a..06e83a38baa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -106,12 +106,16 @@ public class CompactionPipeline { * removed. * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out * During index merge op this will be false and for compaction it will be true. + * @param updateRegionSize whether to update the region size. Update the region size, + * when the pipeline is swapped as part of in-memory-flush and + * further merge/compaction. Don't update the region size when the + * swap is result of the snapshot (flush-to-disk). * @return true iff swapped tail with new segment */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", justification="Increment is done under a synchronize block so safe") public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, - boolean closeSuffix) { + boolean closeSuffix, boolean updateRegionSize) { if (versionedList.getVersion() != version) { return false; } @@ -135,7 +139,7 @@ public class CompactionPipeline { readOnlyCopy = new LinkedList<>(pipeline); version++; } - if (closeSuffix && region != null) { + if (updateRegionSize && region != null) { // update the global memstore size counter long suffixDataSize = getSegmentsKeySize(suffix); long newDataSize = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index f1273a95fc6..19b66b48889 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -99,7 +99,7 @@ public class ImmutableSegment extends Segment { super(null, // initiailize the CellSet with NULL comparator, memStoreLAB); this.type = type; - // build the true CellSet based on CellArrayMap + // build the new CellSet based on CellArrayMap CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge); this.setCellSet(null, cs); // update the CellSet of the new Segment @@ -203,7 +203,7 @@ public class ImmutableSegment extends Segment { cells[i] = maybeCloneWithAllocator(c); } boolean useMSLAB = (getMemStoreLAB()!=null); - // second parameter true, because in compaction addition of the cell to new segment + // second parameter true, because in compaction/merge the addition of the cell to new segment // is always successful updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell i++; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index dfa7d18ec7d..0d3f47e2ad3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -44,22 +44,26 @@ import java.util.concurrent.atomic.AtomicBoolean; @InterfaceAudience.Private public class MemStoreCompactor { + // The upper bound for the number of segments we store in the pipeline prior to merging. + // This constant is subject to further experimentation. + // The external setting of the compacting MemStore behaviour + public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY = + "hbase.hregion.compacting.pipeline.segments.limit"; + // remaining with the same ("infinity") but configurable default for now + public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 30; + public static final long DEEP_OVERHEAD = ClassSize .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE // compactingMemStore, versionedList, action, isInterrupted (the reference) // "action" is an enum and thus it is a class with static final constants, // so counting only the size of the reference to it and not the size of the internals - + Bytes.SIZEOF_INT // compactionKVMax + + 2 * Bytes.SIZEOF_INT // compactionKVMax, pipelineThreshold + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) ); - // The upper bound for the number of segments we store in the pipeline prior to merging. - // This constant is subject to further experimentation. - private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity - private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); - + private final int pipelineThreshold; // the limit on the number of the segments in the pipeline private CompactingMemStore compactingMemStore; // a static version of the segment list from the pipeline @@ -91,6 +95,9 @@ public class MemStoreCompactor { this.compactionKVMax = compactingMemStore.getConfiguration() .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); initiateAction(compactionPolicy); + pipelineThreshold = // get the limit on the number of the segments in the pipeline + compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY, + COMPACTING_MEMSTORE_THRESHOLD_DEFAULT); } /**---------------------------------------------------------------------- @@ -161,7 +168,7 @@ public class MemStoreCompactor { // compaction shouldn't happen or doesn't worth it // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { + if (numOfSegments > pipelineThreshold) { LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + " is going to be merged, as there are " + numOfSegments + " segments"); return Action.MERGE; // to avoid too many segments, merge now diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 57eee30e0ea..aae0a4d2f86 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -756,22 +756,24 @@ public class TestWalAndCompactingMemStoreFlush { } @Test(timeout = 180000) - public void testSelectiveFlushAndWALinIndexCompaction() throws IOException { + public void testSelectiveFlushWithBasicAndMerge() throws IOException { // Set up the configuration Configuration conf = HBaseConfiguration.create(); - conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class.getName()); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, - 200 * 1024); + 75 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); - // set memstore to do data compaction and not to use the speculative scan + // set memstore to do index compaction with merge conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(MemoryCompactionPolicy.BASIC)); + // length of pipeline that requires merge + conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); // Intialize the HRegion - HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); - // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf); + // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); if (i <= 100) { @@ -781,7 +783,7 @@ public class TestWalAndCompactingMemStoreFlush { } } } - // Now add more puts for CF2, so that we only flush CF2 to disk + // Now put more entries to CF2 for (int i = 100; i < 2000; i++) { region.put(createPut(2, i)); } @@ -800,13 +802,14 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. - assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() - + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); + assertEquals(totalMemstoreSize, + cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + + cf3MemstoreSizePhaseI.getDataSize()); - // Flush! + // Initiate in-memory Flush! ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); - // CF1 and CF3 should be compacted so wait here to be sure the compaction is done + // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) .isMemStoreFlushingInMemory()) { Threads.sleep(10); @@ -815,21 +818,22 @@ public class TestWalAndCompactingMemStoreFlush { .isMemStoreFlushingInMemory()) { Threads.sleep(10); } + + // Flush-to-disk! CF2 only should be flushed region.flush(false); + MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore(); + MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore(); - long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL() - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); - long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); - long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); - + // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller + assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize()); + // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same + assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize()); // CF2 should have been cleared assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); - assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize()); - // Add same entries to compact them later + // Add the same amount of entries to see the merging for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); if (i <= 100) { @@ -844,16 +848,12 @@ public class TestWalAndCompactingMemStoreFlush { region.put(createPut(2, i)); } - long smallestSeqInRegionCurrentMemstorePhaseIII = region.getWAL() - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); - long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2); - long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3); + MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore(); - // Flush! + // Flush in memory! ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); - // CF1 and CF3 should be compacted so wait here to be sure the compaction is done + // CF1 and CF3 should be merged so wait here to be sure the merge is done while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) .isMemStoreFlushingInMemory()) { Threads.sleep(10); @@ -864,17 +864,28 @@ public class TestWalAndCompactingMemStoreFlush { } region.flush(false); - long smallestSeqInRegionCurrentMemstorePhaseIV = region.getWAL() - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); - long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); - long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); + MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore(); + MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore(); - // now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT progressed due to merge - assertFalse( - smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII); - assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); - assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); + assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize()); + assertEquals( + cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(), + cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()); + assertEquals(3, // active, one in pipeline, snapshot + ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size()); + // CF2 should have been cleared + assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes," + + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII + .getDataSize() + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII + .getHeapSize() + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" + + cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" + + cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() + + "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize() + + "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes " + + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" + cf1MemstoreSizePhaseIV + .getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" + cf1MemstoreSizePhaseIV + .getHeapSize() + "\n", + 0, cf2MemstoreSizePhaseIV.getDataSize()); HBaseTestingUtility.closeRegionAndWAL(region); }