From e89f62e611a124f5a82729ea905bf4e7a0e0be4d Mon Sep 17 00:00:00 2001 From: chenglei Date: Mon, 1 Nov 2021 23:05:57 +0800 Subject: [PATCH] HBASE-26384 Segment already flushed to hfile may still be remained in CompactingMemStore (#3779) Signed-off-by: Duo Zhang --- .../regionserver/CompactingMemStore.java | 8 +- .../regionserver/CompactionPipeline.java | 65 ++- .../hadoop/hbase/regionserver/TestHStore.java | 509 ++++++++++++++++++ 3 files changed, 576 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 5da0de9a304..a9683ac762b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -567,12 +567,12 @@ public class CompactingMemStore extends AbstractMemStore { boolean done = false; while (!done) { iterationsCnt++; - VersionedSegmentsList segments = pipeline.getVersionedList(); + VersionedSegmentsList segments = getImmutableSegments(); pushToSnapshot(segments.getStoreSegments()); // swap can return false in case the pipeline was updated by ongoing compaction // and the version increase, the chance of it happenning is very low // In Swap: don't close segments (they are in snapshot now) and don't update the region size - done = pipeline.swap(segments, null, false, false); + done = swapPipelineWithNull(segments); if (iterationsCnt>2) { // practically it is impossible that this loop iterates more than two times // (because the compaction is stopped and none restarts it while in snapshot request), @@ -585,6 +585,10 @@ public class CompactingMemStore extends AbstractMemStore { } } + protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { + return pipeline.swap(segments, null, false, false); + } + private void pushToSnapshot(List segments) { if(segments.isEmpty()) return; if(segments.size() == 1 && !segments.get(0).isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 62d1f5982e0..2576f78ce68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -64,7 +65,16 @@ public class CompactionPipeline { private final LinkedList pipeline = new LinkedList<>(); // The list is volatile to avoid reading a new allocated reference before the c'tor is executed private volatile LinkedList readOnlyCopy = new LinkedList<>(); - // Version is volatile to ensure it is atomically read when not using a lock + /** + *
+   * Version is volatile to ensure it is atomically read when not using a lock.
+   * To indicate whether the suffix of pipleline changes:
+   * 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
+   *   added at Head, {@link #version} not change.
+   * 2.for {@link CompactionPipeline#swap},{@link #version} increase.
+   * 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
+   * 
+ */ private volatile long version = 0; public CompactionPipeline(RegionServicesForStores region) { @@ -95,7 +105,7 @@ public class CompactionPipeline { public VersionedSegmentsList getVersionedTail() { synchronized (pipeline){ - List segmentList = new ArrayList<>(); + ArrayList segmentList = new ArrayList<>(); if(!pipeline.isEmpty()) { segmentList.add(0, pipeline.getLast()); } @@ -290,10 +300,15 @@ public class CompactionPipeline { return memStoreSizing.getMemStoreSize(); } + /** + * Must be called under the {@link CompactionPipeline#pipeline} Lock. + */ private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { - pipeline.removeAll(suffix); - if(segment != null) pipeline.addLast(segment); + matchAndRemoveSuffixFromPipeline(suffix); + if (segment != null) { + pipeline.addLast(segment); + } // During index merge we won't be closing the segments undergoing the merge. Segment#close() // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data @@ -307,11 +322,53 @@ public class CompactionPipeline { } } + /** + * Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in + * {@link CompactionPipeline#pipeline} one by one from the last element to the first element of + * suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}.
+ * Must be called under the {@link CompactionPipeline#pipeline} Lock. + */ + private void matchAndRemoveSuffixFromPipeline(List suffix) { + if (suffix.isEmpty()) { + return; + } + if (pipeline.size() < suffix.size()) { + throw new IllegalStateException( + "CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size() + + "],pipeline size must greater than or equals suffix size"); + } + + ListIterator suffixIterator = suffix.listIterator(suffix.size()); + ListIterator pipelineIterator = pipeline.listIterator(pipeline.size()); + int count = 0; + while (suffixIterator.hasPrevious()) { + Segment suffixSegment = suffixIterator.previous(); + Segment pipelineSegment = pipelineIterator.previous(); + if (suffixSegment != pipelineSegment) { + throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment + + " is not pipleline segment:[" + pipelineSegment + "]"); + } + count++; + } + + for (int index = 1; index <= count; index++) { + pipeline.pollLast(); + } + + } + // replacing one segment in the pipeline with a new one exactly at the same index // need to be called only within synchronized block + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", + justification = "replaceAtIndex is invoked under a synchronize block so safe") private void replaceAtIndex(int idx, ImmutableSegment newSegment) { pipeline.set(idx, newSegment); readOnlyCopy = new LinkedList<>(pipeline); + // the version increment is indeed needed, because the swap uses removeAll() method of the + // linked-list that compares the objects to find what to remove. + // The flattening changes the segment object completely (creation pattern) and so + // swap will not proceed correctly after concurrent flattening. + version++; } public Segment getTail() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index b0cb1ee52fe..3a0adb1a918 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -97,6 +97,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; +import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; @@ -1935,6 +1936,225 @@ public class TestHStore { } } + /** + *
+    * This test is for HBASE-26384,
+   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
+   * execute concurrently.
+   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
+   * for both branch-2 and master):
+   * 1. The {@link CompactingMemStore} size exceeds
+   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
+   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
+   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
+   * 2. The in memory compact thread starts and then stopping before
+   *    {@link CompactingMemStore#flattenOneSegment}.
+   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
+   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
+   *    compact thread continues.
+   *    Assuming {@link VersionedSegmentsList#version} returned from
+   *    {@link CompactingMemStore#getImmutableSegments} is v.
+   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
+   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+   *    {@link CompactionPipeline#version} is still v.
+   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
+   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
+   *    {@link CompactionPipeline} has changed because
+   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
+   *    removed in fact and still remaining in {@link CompactionPipeline}.
+   *
+   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
+   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
+   *    v+1.
+   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
+   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
+   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
+   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
+   * 
+ */ + @Test + public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + byte[] smallValue = new byte[3]; + byte[] largeValue = new byte[9]; + final long timestamp = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); + final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); + int smallCellByteSize = MutableSegment.getCellLength(smallCell); + int largeCellByteSize = MutableSegment.getCellLength(largeCell); + int totalCellByteSize = (smallCellByteSize + largeCellByteSize); + int flushByteSize = totalCellByteSize - 2; + + // set CompactingMemStore.inmemoryFlushSize to flushByteSize. + conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); + + init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) + .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); + + MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); + assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); + + store.add(smallCell, new NonThreadSafeMemStoreSizing()); + store.add(largeCell, new NonThreadSafeMemStoreSizing()); + + String oldThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); + /** + * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters + * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} + * would invoke {@link CompactingMemStore#stopCompaction}. + */ + myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); + + MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); + myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); + + assertTrue(memStoreSnapshot.getCellsCount() == 2); + assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); + VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); + assertTrue(segments.getNumOfSegments() == 0); + assertTrue(segments.getNumOfCells() == 0); + assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); + assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); + } finally { + Thread.currentThread().setName(oldThreadName); + } + } + + /** + *
+   * This test is for HBASE-26384,
+   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
+   * and writeMemStore execute concurrently.
+   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
+   * for both branch-2 and master):
+   * 1. The {@link CompactingMemStore} size exceeds
+   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
+   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
+   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
+   * 2. The in memory compact thread starts and then stopping before
+   *    {@link CompactingMemStore#flattenOneSegment}.
+   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
+   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
+   *    compact thread continues.
+   *    Assuming {@link VersionedSegmentsList#version} returned from
+   *    {@link CompactingMemStore#getImmutableSegments} is v.
+   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
+   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+   *    {@link CompactionPipeline#version} is still v.
+   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
+   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
+   *    {@link CompactionPipeline} has changed because
+   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
+   *    removed in fact and still remaining in {@link CompactionPipeline}.
+   *
+   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
+   * and I add step 7-8 to test there is new segment added before retry.
+   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
+   *     v+1.
+   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
+   *    failed and retry,{@link VersionedSegmentsList#version} returned from
+   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
+   * 7. The write thread continues writing to {@link CompactingMemStore} and
+   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
+   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
+   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
+   *    {@link CompactionPipeline#version} is still v+1.
+   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is still v+1,
+   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
+   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
+   *    {@link CompactingMemStore#swapPipelineWithNull}.
+   * 
+ */ + @Test + public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + byte[] smallValue = new byte[3]; + byte[] largeValue = new byte[9]; + final long timestamp = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); + final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); + int smallCellByteSize = MutableSegment.getCellLength(smallCell); + int largeCellByteSize = MutableSegment.getCellLength(largeCell); + int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); + int flushByteSize = firstWriteCellByteSize - 2; + + // set CompactingMemStore.inmemoryFlushSize to flushByteSize. + conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); + + init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) + .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); + + final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); + assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); + + store.add(smallCell, new NonThreadSafeMemStoreSizing()); + store.add(largeCell, new NonThreadSafeMemStoreSizing()); + + final AtomicReference exceptionRef = new AtomicReference(); + final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); + final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); + final int writeAgainCellByteSize = MutableSegment.getCellLength(writeAgainCell1) + + MutableSegment.getCellLength(writeAgainCell2); + final Thread writeAgainThread = new Thread(() -> { + try { + myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); + + store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); + store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); + + myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); + } catch (Throwable exception) { + exceptionRef.set(exception); + } + }); + writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); + writeAgainThread.start(); + + String oldThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); + /** + * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters + * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} + * would invoke {@link CompactingMemStore#stopCompaction}. + */ + myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); + MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); + myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); + writeAgainThread.join(); + + assertTrue(memStoreSnapshot.getCellsCount() == 2); + assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); + VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); + assertTrue(segments.getNumOfSegments() == 1); + assertTrue( + ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); + assertTrue(segments.getNumOfCells() == 2); + assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); + assertTrue(exceptionRef.get() == null); + assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); + } finally { + Thread.currentThread().setName(oldThreadName); + } + } + private HStoreFile mockStoreFileWithLength(long length) { HStoreFile sf = mock(HStoreFile.class); StoreFileReader sfr = mock(StoreFileReader.class); @@ -2300,4 +2520,293 @@ public class TestHStore { } } + + public static class MyCompactingMemStore4 extends CompactingMemStore { + private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; + /** + * {@link CompactingMemStore#flattenOneSegment} must execute after + * {@link CompactingMemStore#getImmutableSegments} + */ + private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); + /** + * Only after {@link CompactingMemStore#flattenOneSegment} completed, + * {@link CompactingMemStore#swapPipelineWithNull} could execute. + */ + private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); + /** + * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the + * snapshot thread starts {@link CompactingMemStore#snapshot},because + * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. + */ + private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); + /** + * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. + */ + private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); + private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); + private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); + private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); + private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); + + public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, + HStore store, RegionServicesForStores regionServices, + MemoryCompactionPolicy compactionPolicy) throws IOException { + super(conf, cellComparator, store, regionServices, compactionPolicy); + } + + @Override + public VersionedSegmentsList getImmutableSegments() { + VersionedSegmentsList result = super.getImmutableSegments(); + if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { + int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); + if (currentCount <= 1) { + try { + flattenOneSegmentPreCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + return result; + } + + @Override + protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { + if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { + int currentCount = swapPipelineWithNullCounter.incrementAndGet(); + if (currentCount <= 1) { + try { + flattenOneSegmentPostCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + boolean result = super.swapPipelineWithNull(segments); + if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { + int currentCount = swapPipelineWithNullCounter.get(); + if (currentCount <= 1) { + assertTrue(!result); + } + if (currentCount == 2) { + assertTrue(result); + } + } + return result; + + } + + @Override + public void flattenOneSegment(long requesterVersion, Action action) { + int currentCount = flattenOneSegmentCounter.incrementAndGet(); + if (currentCount <= 1) { + try { + /** + * {@link CompactingMemStore#snapshot} could start. + */ + snapShotStartCyclicCyclicBarrier.await(); + flattenOneSegmentPreCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + super.flattenOneSegment(requesterVersion, action); + if (currentCount <= 1) { + try { + flattenOneSegmentPostCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + + @Override + protected boolean setInMemoryCompactionFlag() { + boolean result = super.setInMemoryCompactionFlag(); + assertTrue(result); + setInMemoryCompactionFlagCounter.incrementAndGet(); + return result; + } + + @Override + void inMemoryCompaction() { + try { + super.inMemoryCompaction(); + } finally { + try { + inMemoryCompactionEndCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + } + } + + } + + public static class MyCompactingMemStore5 extends CompactingMemStore { + private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; + private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; + /** + * {@link CompactingMemStore#flattenOneSegment} must execute after + * {@link CompactingMemStore#getImmutableSegments} + */ + private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); + /** + * Only after {@link CompactingMemStore#flattenOneSegment} completed, + * {@link CompactingMemStore#swapPipelineWithNull} could execute. + */ + private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); + /** + * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the + * snapshot thread starts {@link CompactingMemStore#snapshot},because + * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. + */ + private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); + /** + * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. + */ + private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); + private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); + private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); + private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); + private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); + /** + * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain + * thread could start. + */ + private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); + /** + * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the + * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would + * execute,and in memory compact thread would exit,because we expect that in memory compact + * executing only once. + */ + private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); + + public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, + HStore store, RegionServicesForStores regionServices, + MemoryCompactionPolicy compactionPolicy) throws IOException { + super(conf, cellComparator, store, regionServices, compactionPolicy); + } + + @Override + public VersionedSegmentsList getImmutableSegments() { + VersionedSegmentsList result = super.getImmutableSegments(); + if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { + int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); + if (currentCount <= 1) { + try { + flattenOneSegmentPreCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + } + + return result; + } + + @Override + protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { + if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { + int currentCount = swapPipelineWithNullCounter.incrementAndGet(); + if (currentCount <= 1) { + try { + flattenOneSegmentPostCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + if (currentCount == 2) { + try { + /** + * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, + * writeAgain thread could start. + */ + writeMemStoreAgainStartCyclicBarrier.await(); + /** + * Only the writeAgain thread completes, retry + * {@link CompactingMemStore#swapPipelineWithNull} would execute. + */ + writeMemStoreAgainEndCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + } + boolean result = super.swapPipelineWithNull(segments); + if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { + int currentCount = swapPipelineWithNullCounter.get(); + if (currentCount <= 1) { + assertTrue(!result); + } + if (currentCount == 2) { + assertTrue(result); + } + } + return result; + + } + + @Override + public void flattenOneSegment(long requesterVersion, Action action) { + int currentCount = flattenOneSegmentCounter.incrementAndGet(); + if (currentCount <= 1) { + try { + /** + * {@link CompactingMemStore#snapshot} could start. + */ + snapShotStartCyclicCyclicBarrier.await(); + flattenOneSegmentPreCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + super.flattenOneSegment(requesterVersion, action); + if (currentCount <= 1) { + try { + flattenOneSegmentPostCyclicBarrier.await(); + /** + * Only the writeAgain thread completes, in memory compact thread would exit,because we + * expect that in memory compact executing only once. + */ + writeMemStoreAgainEndCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + } + } + + @Override + protected boolean setInMemoryCompactionFlag() { + boolean result = super.setInMemoryCompactionFlag(); + int count = setInMemoryCompactionFlagCounter.incrementAndGet(); + if (count <= 1) { + assertTrue(result); + } + if (count == 2) { + assertTrue(!result); + } + return result; + } + + @Override + void inMemoryCompaction() { + try { + super.inMemoryCompaction(); + } finally { + try { + inMemoryCompactionEndCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + } + } + } }