diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 0c56693be07..26b2f49caa4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -279,8 +279,7 @@ public class CompactingMemStore extends AbstractMemStore { public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, boolean merge) { - // last true stands for updating the region size - return pipeline.swap(versionedList, result, !merge, true); + return pipeline.swap(versionedList, result, !merge); } /** @@ -438,8 +437,7 @@ public class CompactingMemStore extends AbstractMemStore { private void pushTailToSnapshot() { VersionedSegmentsList segments = pipeline.getVersionedTail(); pushToSnapshot(segments.getStoreSegments()); - // In Swap: don't close segments (they are in snapshot now) and don't update the region size - pipeline.swap(segments,null,false, false); + pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now } private void pushPipelineToSnapshot() { @@ -451,8 +449,7 @@ public class CompactingMemStore extends AbstractMemStore { pushToSnapshot(segments.getStoreSegments()); // swap can return false in case the pipeline was updated by ongoing compaction // and the version increase, the chance of it happenning is very low - // In Swap: don't close segments (they are in snapshot now) and don't update the region size - done = pipeline.swap(segments, null, false, false); + done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now if (iterationsCnt>2) { // practically it is impossible that this loop iterates more than two times // (because the compaction is stopped and none restarts it while in snapshot request), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 06e83a38baa..e64c0fb113a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -106,16 +106,12 @@ public class CompactionPipeline { * removed. * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out * During index merge op this will be false and for compaction it will be true. - * @param updateRegionSize whether to update the region size. Update the region size, - * when the pipeline is swapped as part of in-memory-flush and - * further merge/compaction. Don't update the region size when the - * swap is result of the snapshot (flush-to-disk). * @return true iff swapped tail with new segment */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", justification="Increment is done under a synchronize block so safe") public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, - boolean closeSuffix, boolean updateRegionSize) { + boolean closeSuffix) { if (versionedList.getVersion() != version) { return false; } @@ -139,7 +135,7 @@ public class CompactionPipeline { readOnlyCopy = new LinkedList<>(pipeline); version++; } - if (updateRegionSize && region != null) { + if (closeSuffix && region != null) { // update the global memstore size counter long suffixDataSize = getSegmentsKeySize(suffix); long newDataSize = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 19b66b48889..f1273a95fc6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -99,7 +99,7 @@ public class ImmutableSegment extends Segment { super(null, // initiailize the CellSet with NULL comparator, memStoreLAB); this.type = type; - // build the new CellSet based on CellArrayMap + // build the true CellSet based on CellArrayMap CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge); this.setCellSet(null, cs); // update the CellSet of the new Segment @@ -203,7 +203,7 @@ public class ImmutableSegment extends Segment { cells[i] = maybeCloneWithAllocator(c); } boolean useMSLAB = (getMemStoreLAB()!=null); - // second parameter true, because in compaction/merge the addition of the cell to new segment + // second parameter true, because in compaction addition of the cell to new segment // is always successful updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell i++; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 0d3f47e2ad3..dfa7d18ec7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -44,26 +44,22 @@ import java.util.concurrent.atomic.AtomicBoolean; @InterfaceAudience.Private public class MemStoreCompactor { - // The upper bound for the number of segments we store in the pipeline prior to merging. - // This constant is subject to further experimentation. - // The external setting of the compacting MemStore behaviour - public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY = - "hbase.hregion.compacting.pipeline.segments.limit"; - // remaining with the same ("infinity") but configurable default for now - public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 30; - public static final long DEEP_OVERHEAD = ClassSize .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE // compactingMemStore, versionedList, action, isInterrupted (the reference) // "action" is an enum and thus it is a class with static final constants, // so counting only the size of the reference to it and not the size of the internals - + 2 * Bytes.SIZEOF_INT // compactionKVMax, pipelineThreshold + + Bytes.SIZEOF_INT // compactionKVMax + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) ); + // The upper bound for the number of segments we store in the pipeline prior to merging. + // This constant is subject to further experimentation. + private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity + private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); - private final int pipelineThreshold; // the limit on the number of the segments in the pipeline + private CompactingMemStore compactingMemStore; // a static version of the segment list from the pipeline @@ -95,9 +91,6 @@ public class MemStoreCompactor { this.compactionKVMax = compactingMemStore.getConfiguration() .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); initiateAction(compactionPolicy); - pipelineThreshold = // get the limit on the number of the segments in the pipeline - compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY, - COMPACTING_MEMSTORE_THRESHOLD_DEFAULT); } /**---------------------------------------------------------------------- @@ -168,7 +161,7 @@ public class MemStoreCompactor { // compaction shouldn't happen or doesn't worth it // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > pipelineThreshold) { + if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + " is going to be merged, as there are " + numOfSegments + " segments"); return Action.MERGE; // to avoid too many segments, merge now diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index aae0a4d2f86..57eee30e0ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -756,24 +756,22 @@ public class TestWalAndCompactingMemStoreFlush { } @Test(timeout = 180000) - public void testSelectiveFlushWithBasicAndMerge() throws IOException { + public void testSelectiveFlushAndWALinIndexCompaction() throws IOException { // Set up the configuration Configuration conf = HBaseConfiguration.create(); - conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class.getName()); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, - 75 * 1024); + 200 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); - // set memstore to do index compaction with merge + // set memstore to do data compaction and not to use the speculative scan conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(MemoryCompactionPolicy.BASIC)); - // length of pipeline that requires merge - conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); // Intialize the HRegion - HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf); - // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3 + HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); if (i <= 100) { @@ -783,7 +781,7 @@ public class TestWalAndCompactingMemStoreFlush { } } } - // Now put more entries to CF2 + // Now add more puts for CF2, so that we only flush CF2 to disk for (int i = 100; i < 2000; i++) { region.put(createPut(2, i)); } @@ -802,14 +800,13 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. - assertEquals(totalMemstoreSize, - cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() - + cf3MemstoreSizePhaseI.getDataSize()); + assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() + + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); - // Initiate in-memory Flush! + // Flush! ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); - // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done + // CF1 and CF3 should be compacted so wait here to be sure the compaction is done while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) .isMemStoreFlushingInMemory()) { Threads.sleep(10); @@ -818,22 +815,21 @@ public class TestWalAndCompactingMemStoreFlush { .isMemStoreFlushingInMemory()) { Threads.sleep(10); } - - // Flush-to-disk! CF2 only should be flushed region.flush(false); - MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore(); - MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore(); - // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller - assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize()); - // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same - assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize()); + long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL() + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); + // CF2 should have been cleared assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); + assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize()); - // Add the same amount of entries to see the merging + // Add same entries to compact them later for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); if (i <= 100) { @@ -848,12 +844,16 @@ public class TestWalAndCompactingMemStoreFlush { region.put(createPut(2, i)); } - MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore(); + long smallestSeqInRegionCurrentMemstorePhaseIII = region.getWAL() + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3); - // Flush in memory! + // Flush! ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); - // CF1 and CF3 should be merged so wait here to be sure the merge is done + // CF1 and CF3 should be compacted so wait here to be sure the compaction is done while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) .isMemStoreFlushingInMemory()) { Threads.sleep(10); @@ -864,28 +864,17 @@ public class TestWalAndCompactingMemStoreFlush { } region.flush(false); - MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore(); - MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore(); + long smallestSeqInRegionCurrentMemstorePhaseIV = region.getWAL() + .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); - assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize()); - assertEquals( - cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(), - cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()); - assertEquals(3, // active, one in pipeline, snapshot - ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size()); - // CF2 should have been cleared - assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes," - + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII - .getDataSize() + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII - .getHeapSize() + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" - + cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" - + cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() - + "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize() - + "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes " - + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" + cf1MemstoreSizePhaseIV - .getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" + cf1MemstoreSizePhaseIV - .getHeapSize() + "\n", - 0, cf2MemstoreSizePhaseIV.getDataSize()); + // now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT progressed due to merge + assertFalse( + smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII); + assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); + assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); HBaseTestingUtility.closeRegionAndWAL(region); }