Revert "Reviving the merge of the compacting pipeline: making the limit on the number of the segments in the pipeline configurable, adding merge test, fixing bug in sizes counting"

This reverts commit c77e2135db.

Bad commit message
This commit is contained in:
Sean Busbey 2017-03-28 10:53:41 -05:00
parent ea566e7c54
commit 046d4e183b
5 changed files with 50 additions and 75 deletions

View File

@ -279,8 +279,7 @@ public class CompactingMemStore extends AbstractMemStore {
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) { boolean merge) {
// last true stands for updating the region size return pipeline.swap(versionedList, result, !merge);
return pipeline.swap(versionedList, result, !merge, true);
} }
/** /**
@ -438,8 +437,7 @@ public class CompactingMemStore extends AbstractMemStore {
private void pushTailToSnapshot() { private void pushTailToSnapshot() {
VersionedSegmentsList segments = pipeline.getVersionedTail(); VersionedSegmentsList segments = pipeline.getVersionedTail();
pushToSnapshot(segments.getStoreSegments()); pushToSnapshot(segments.getStoreSegments());
// In Swap: don't close segments (they are in snapshot now) and don't update the region size pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
pipeline.swap(segments,null,false, false);
} }
private void pushPipelineToSnapshot() { private void pushPipelineToSnapshot() {
@ -451,8 +449,7 @@ public class CompactingMemStore extends AbstractMemStore {
pushToSnapshot(segments.getStoreSegments()); pushToSnapshot(segments.getStoreSegments());
// swap can return false in case the pipeline was updated by ongoing compaction // swap can return false in case the pipeline was updated by ongoing compaction
// and the version increase, the chance of it happenning is very low // and the version increase, the chance of it happenning is very low
// In Swap: don't close segments (they are in snapshot now) and don't update the region size done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now
done = pipeline.swap(segments, null, false, false);
if (iterationsCnt>2) { if (iterationsCnt>2) {
// practically it is impossible that this loop iterates more than two times // practically it is impossible that this loop iterates more than two times
// (because the compaction is stopped and none restarts it while in snapshot request), // (because the compaction is stopped and none restarts it while in snapshot request),

View File

@ -106,16 +106,12 @@ public class CompactionPipeline {
* removed. * removed.
* @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
* During index merge op this will be false and for compaction it will be true. * During index merge op this will be false and for compaction it will be true.
* @param updateRegionSize whether to update the region size. Update the region size,
* when the pipeline is swapped as part of in-memory-flush and
* further merge/compaction. Don't update the region size when the
* swap is result of the snapshot (flush-to-disk).
* @return true iff swapped tail with new segment * @return true iff swapped tail with new segment
*/ */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
justification="Increment is done under a synchronize block so safe") justification="Increment is done under a synchronize block so safe")
public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
boolean closeSuffix, boolean updateRegionSize) { boolean closeSuffix) {
if (versionedList.getVersion() != version) { if (versionedList.getVersion() != version) {
return false; return false;
} }
@ -139,7 +135,7 @@ public class CompactionPipeline {
readOnlyCopy = new LinkedList<>(pipeline); readOnlyCopy = new LinkedList<>(pipeline);
version++; version++;
} }
if (updateRegionSize && region != null) { if (closeSuffix && region != null) {
// update the global memstore size counter // update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix); long suffixDataSize = getSegmentsKeySize(suffix);
long newDataSize = 0; long newDataSize = 0;

View File

@ -99,7 +99,7 @@ public class ImmutableSegment extends Segment {
super(null, // initiailize the CellSet with NULL super(null, // initiailize the CellSet with NULL
comparator, memStoreLAB); comparator, memStoreLAB);
this.type = type; this.type = type;
// build the new CellSet based on CellArrayMap // build the true CellSet based on CellArrayMap
CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge); CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
this.setCellSet(null, cs); // update the CellSet of the new Segment this.setCellSet(null, cs); // update the CellSet of the new Segment
@ -203,7 +203,7 @@ public class ImmutableSegment extends Segment {
cells[i] = maybeCloneWithAllocator(c); cells[i] = maybeCloneWithAllocator(c);
} }
boolean useMSLAB = (getMemStoreLAB()!=null); boolean useMSLAB = (getMemStoreLAB()!=null);
// second parameter true, because in compaction/merge the addition of the cell to new segment // second parameter true, because in compaction addition of the cell to new segment
// is always successful // is always successful
updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
i++; i++;

View File

@ -44,26 +44,22 @@ import java.util.concurrent.atomic.AtomicBoolean;
@InterfaceAudience.Private @InterfaceAudience.Private
public class MemStoreCompactor { public class MemStoreCompactor {
// The upper bound for the number of segments we store in the pipeline prior to merging.
// This constant is subject to further experimentation.
// The external setting of the compacting MemStore behaviour
public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
"hbase.hregion.compacting.pipeline.segments.limit";
// remaining with the same ("infinity") but configurable default for now
public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 30;
public static final long DEEP_OVERHEAD = ClassSize public static final long DEEP_OVERHEAD = ClassSize
.align(ClassSize.OBJECT .align(ClassSize.OBJECT
+ 4 * ClassSize.REFERENCE + 4 * ClassSize.REFERENCE
// compactingMemStore, versionedList, action, isInterrupted (the reference) // compactingMemStore, versionedList, action, isInterrupted (the reference)
// "action" is an enum and thus it is a class with static final constants, // "action" is an enum and thus it is a class with static final constants,
// so counting only the size of the reference to it and not the size of the internals // so counting only the size of the reference to it and not the size of the internals
+ 2 * Bytes.SIZEOF_INT // compactionKVMax, pipelineThreshold + Bytes.SIZEOF_INT // compactionKVMax
+ ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals)
); );
// The upper bound for the number of segments we store in the pipeline prior to merging.
// This constant is subject to further experimentation.
private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
private final int pipelineThreshold; // the limit on the number of the segments in the pipeline
private CompactingMemStore compactingMemStore; private CompactingMemStore compactingMemStore;
// a static version of the segment list from the pipeline // a static version of the segment list from the pipeline
@ -95,9 +91,6 @@ public class MemStoreCompactor {
this.compactionKVMax = compactingMemStore.getConfiguration() this.compactionKVMax = compactingMemStore.getConfiguration()
.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
initiateAction(compactionPolicy); initiateAction(compactionPolicy);
pipelineThreshold = // get the limit on the number of the segments in the pipeline
compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY,
COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
} }
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
@ -168,7 +161,7 @@ public class MemStoreCompactor {
// compaction shouldn't happen or doesn't worth it // compaction shouldn't happen or doesn't worth it
// limit the number of the segments in the pipeline // limit the number of the segments in the pipeline
int numOfSegments = versionedList.getNumOfSegments(); int numOfSegments = versionedList.getNumOfSegments();
if (numOfSegments > pipelineThreshold) { if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) {
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be merged, as there are " + numOfSegments + " segments"); + " is going to be merged, as there are " + numOfSegments + " segments");
return Action.MERGE; // to avoid too many segments, merge now return Action.MERGE; // to avoid too many segments, merge now

View File

@ -756,24 +756,22 @@ public class TestWalAndCompactingMemStoreFlush {
} }
@Test(timeout = 180000) @Test(timeout = 180000)
public void testSelectiveFlushWithBasicAndMerge() throws IOException { public void testSelectiveFlushAndWALinIndexCompaction() throws IOException {
// Set up the configuration // Set up the configuration
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
FlushNonSloppyStoresFirstPolicy.class.getName()); FlushNonSloppyStoresFirstPolicy.class.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
75 * 1024); 200 * 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
// set memstore to do index compaction with merge // set memstore to do data compaction and not to use the speculative scan
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(MemoryCompactionPolicy.BASIC)); String.valueOf(MemoryCompactionPolicy.BASIC));
// length of pipeline that requires merge
conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
// Intialize the HRegion // Intialize the HRegion
HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf); HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
// Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
for (int i = 1; i <= 1200; i++) { for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i)); region.put(createPut(1, i));
if (i <= 100) { if (i <= 100) {
@ -783,7 +781,7 @@ public class TestWalAndCompactingMemStoreFlush {
} }
} }
} }
// Now put more entries to CF2 // Now add more puts for CF2, so that we only flush CF2 to disk
for (int i = 100; i < 2000; i++) { for (int i = 100; i < 2000; i++) {
region.put(createPut(2, i)); region.put(createPut(2, i));
} }
@ -802,14 +800,13 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of // The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
assertEquals(totalMemstoreSize, assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
+ cf3MemstoreSizePhaseI.getDataSize());
// Initiate in-memory Flush! // Flush!
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
// CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done // CF1 and CF3 should be compacted so wait here to be sure the compaction is done
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) { .isMemStoreFlushingInMemory()) {
Threads.sleep(10); Threads.sleep(10);
@ -818,22 +815,21 @@ public class TestWalAndCompactingMemStoreFlush {
.isMemStoreFlushingInMemory()) { .isMemStoreFlushingInMemory()) {
Threads.sleep(10); Threads.sleep(10);
} }
// Flush-to-disk! CF2 only should be flushed
region.flush(false); region.flush(false);
MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
// CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL()
assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize()); long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
// CF2 should have been cleared // CF2 should have been cleared
assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize());
// Add the same amount of entries to see the merging // Add same entries to compact them later
for (int i = 1; i <= 1200; i++) { for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i)); region.put(createPut(1, i));
if (i <= 100) { if (i <= 100) {
@ -848,12 +844,16 @@ public class TestWalAndCompactingMemStoreFlush {
region.put(createPut(2, i)); region.put(createPut(2, i));
} }
MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore(); long smallestSeqInRegionCurrentMemstorePhaseIII = region.getWAL()
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
// Flush in memory! // Flush!
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
// CF1 and CF3 should be merged so wait here to be sure the merge is done // CF1 and CF3 should be compacted so wait here to be sure the compaction is done
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) { .isMemStoreFlushingInMemory()) {
Threads.sleep(10); Threads.sleep(10);
@ -864,28 +864,17 @@ public class TestWalAndCompactingMemStoreFlush {
} }
region.flush(false); region.flush(false);
MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore(); long smallestSeqInRegionCurrentMemstorePhaseIV = region.getWAL()
MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore(); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize()); // now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT progressed due to merge
assertEquals( assertFalse(
cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(), smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()); assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
assertEquals(3, // active, one in pipeline, snapshot assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size());
// CF2 should have been cleared
assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes,"
+ " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII
.getDataSize() + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII
.getHeapSize() + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/"
+ cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/"
+ cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize()
+ "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize()
+ "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes "
+ " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" + cf1MemstoreSizePhaseIV
.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" + cf1MemstoreSizePhaseIV
.getHeapSize() + "\n",
0, cf2MemstoreSizePhaseIV.getDataSize());
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }