HBASE-17765 Reviving the merge of the compacting pipeline

making the limit on the number of the segments in the pipeline configurable, adding merge test, fixing bug in sizes counting
This commit is contained in:
anastas 2017-03-27 15:41:32 +03:00 committed by Sean Busbey
parent 046d4e183b
commit cb4fac1d18
5 changed files with 75 additions and 50 deletions

View File

@ -279,7 +279,8 @@ public class CompactingMemStore extends AbstractMemStore {
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) {
return pipeline.swap(versionedList, result, !merge);
// last true stands for updating the region size
return pipeline.swap(versionedList, result, !merge, true);
}
/**
@ -437,7 +438,8 @@ public class CompactingMemStore extends AbstractMemStore {
private void pushTailToSnapshot() {
VersionedSegmentsList segments = pipeline.getVersionedTail();
pushToSnapshot(segments.getStoreSegments());
pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
pipeline.swap(segments,null,false, false);
}
private void pushPipelineToSnapshot() {
@ -449,7 +451,8 @@ public class CompactingMemStore extends AbstractMemStore {
pushToSnapshot(segments.getStoreSegments());
// swap can return false in case the pipeline was updated by ongoing compaction
// and the version increase, the chance of it happenning is very low
done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
done = pipeline.swap(segments, null, false, false);
if (iterationsCnt>2) {
// practically it is impossible that this loop iterates more than two times
// (because the compaction is stopped and none restarts it while in snapshot request),

View File

@ -106,12 +106,16 @@ public class CompactionPipeline {
* removed.
* @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
* During index merge op this will be false and for compaction it will be true.
* @param updateRegionSize whether to update the region size. Update the region size,
* when the pipeline is swapped as part of in-memory-flush and
* further merge/compaction. Don't update the region size when the
* swap is result of the snapshot (flush-to-disk).
* @return true iff swapped tail with new segment
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
justification="Increment is done under a synchronize block so safe")
public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
boolean closeSuffix) {
boolean closeSuffix, boolean updateRegionSize) {
if (versionedList.getVersion() != version) {
return false;
}
@ -135,7 +139,7 @@ public class CompactionPipeline {
readOnlyCopy = new LinkedList<>(pipeline);
version++;
}
if (closeSuffix && region != null) {
if (updateRegionSize && region != null) {
// update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix);
long newDataSize = 0;

View File

@ -99,7 +99,7 @@ public class ImmutableSegment extends Segment {
super(null, // initiailize the CellSet with NULL
comparator, memStoreLAB);
this.type = type;
// build the true CellSet based on CellArrayMap
// build the new CellSet based on CellArrayMap
CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
this.setCellSet(null, cs); // update the CellSet of the new Segment
@ -203,7 +203,7 @@ public class ImmutableSegment extends Segment {
cells[i] = maybeCloneWithAllocator(c);
}
boolean useMSLAB = (getMemStoreLAB()!=null);
// second parameter true, because in compaction addition of the cell to new segment
// second parameter true, because in compaction/merge the addition of the cell to new segment
// is always successful
updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
i++;

View File

@ -44,22 +44,26 @@ import java.util.concurrent.atomic.AtomicBoolean;
@InterfaceAudience.Private
public class MemStoreCompactor {
// The upper bound for the number of segments we store in the pipeline prior to merging.
// This constant is subject to further experimentation.
// The external setting of the compacting MemStore behaviour
public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
"hbase.hregion.compacting.pipeline.segments.limit";
// remaining with the same ("infinity") but configurable default for now
public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 30;
public static final long DEEP_OVERHEAD = ClassSize
.align(ClassSize.OBJECT
+ 4 * ClassSize.REFERENCE
// compactingMemStore, versionedList, action, isInterrupted (the reference)
// "action" is an enum and thus it is a class with static final constants,
// so counting only the size of the reference to it and not the size of the internals
+ Bytes.SIZEOF_INT // compactionKVMax
+ 2 * Bytes.SIZEOF_INT // compactionKVMax, pipelineThreshold
+ ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals)
);
// The upper bound for the number of segments we store in the pipeline prior to merging.
// This constant is subject to further experimentation.
private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
private final int pipelineThreshold; // the limit on the number of the segments in the pipeline
private CompactingMemStore compactingMemStore;
// a static version of the segment list from the pipeline
@ -91,6 +95,9 @@ public class MemStoreCompactor {
this.compactionKVMax = compactingMemStore.getConfiguration()
.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
initiateAction(compactionPolicy);
pipelineThreshold = // get the limit on the number of the segments in the pipeline
compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY,
COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
}
/**----------------------------------------------------------------------
@ -161,7 +168,7 @@ public class MemStoreCompactor {
// compaction shouldn't happen or doesn't worth it
// limit the number of the segments in the pipeline
int numOfSegments = versionedList.getNumOfSegments();
if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) {
if (numOfSegments > pipelineThreshold) {
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be merged, as there are " + numOfSegments + " segments");
return Action.MERGE; // to avoid too many segments, merge now

View File

@ -756,22 +756,24 @@ public class TestWalAndCompactingMemStoreFlush {
}
@Test(timeout = 180000)
public void testSelectiveFlushAndWALinIndexCompaction() throws IOException {
public void testSelectiveFlushWithBasicAndMerge() throws IOException {
// Set up the configuration
Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
FlushNonSloppyStoresFirstPolicy.class.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
200 * 1024);
75 * 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
// set memstore to do data compaction and not to use the speculative scan
// set memstore to do index compaction with merge
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(MemoryCompactionPolicy.BASIC));
// length of pipeline that requires merge
conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
// Intialize the HRegion
HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
// Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3
for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i));
if (i <= 100) {
@ -781,7 +783,7 @@ public class TestWalAndCompactingMemStoreFlush {
}
}
}
// Now add more puts for CF2, so that we only flush CF2 to disk
// Now put more entries to CF2
for (int i = 100; i < 2000; i++) {
region.put(createPut(2, i));
}
@ -800,13 +802,14 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3.
assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
+ cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
assertEquals(totalMemstoreSize,
cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize()
+ cf3MemstoreSizePhaseI.getDataSize());
// Flush!
// Initiate in-memory Flush!
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
// CF1 and CF3 should be compacted so wait here to be sure the compaction is done
// CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
@ -815,21 +818,22 @@ public class TestWalAndCompactingMemStoreFlush {
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
// Flush-to-disk! CF2 only should be flushed
region.flush(false);
MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL()
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
// CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller
assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize());
// CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same
assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize());
// CF2 should have been cleared
assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize());
// Add same entries to compact them later
// Add the same amount of entries to see the merging
for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i));
if (i <= 100) {
@ -844,16 +848,12 @@ public class TestWalAndCompactingMemStoreFlush {
region.put(createPut(2, i));
}
long smallestSeqInRegionCurrentMemstorePhaseIII = region.getWAL()
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore();
// Flush!
// Flush in memory!
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
// CF1 and CF3 should be compacted so wait here to be sure the compaction is done
// CF1 and CF3 should be merged so wait here to be sure the merge is done
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
@ -864,17 +864,28 @@ public class TestWalAndCompactingMemStoreFlush {
}
region.flush(false);
long smallestSeqInRegionCurrentMemstorePhaseIV = region.getWAL()
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
// now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT progressed due to merge
assertFalse(
smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
assertEquals(
cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize());
assertEquals(3, // active, one in pipeline, snapshot
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size());
// CF2 should have been cleared
assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes,"
+ " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII
.getDataSize() + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII
.getHeapSize() + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/"
+ cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/"
+ cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize()
+ "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize()
+ "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes "
+ " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" + cf1MemstoreSizePhaseIV
.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" + cf1MemstoreSizePhaseIV
.getHeapSize() + "\n",
0, cf2MemstoreSizePhaseIV.getDataSize());
HBaseTestingUtility.closeRegionAndWAL(region);
}