diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index a9683ac762b..5da0de9a304 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -567,12 +567,12 @@ public class CompactingMemStore extends AbstractMemStore { boolean done = false; while (!done) { iterationsCnt++; - VersionedSegmentsList segments = getImmutableSegments(); + VersionedSegmentsList segments = pipeline.getVersionedList(); pushToSnapshot(segments.getStoreSegments()); // swap can return false in case the pipeline was updated by ongoing compaction // and the version increase, the chance of it happenning is very low // In Swap: don't close segments (they are in snapshot now) and don't update the region size - done = swapPipelineWithNull(segments); + done = pipeline.swap(segments, null, false, false); if (iterationsCnt>2) { // practically it is impossible that this loop iterates more than two times // (because the compaction is stopped and none restarts it while in snapshot request), @@ -585,10 +585,6 @@ public class CompactingMemStore extends AbstractMemStore { } } - protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { - return pipeline.swap(segments, null, false, false); - } - private void pushToSnapshot(List segments) { if(segments.isEmpty()) return; if(segments.size() == 1 && !segments.get(0).isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 2576f78ce68..62d1f5982e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.ListIterator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -65,16 +64,7 @@ public class CompactionPipeline { private final LinkedList pipeline = new LinkedList<>(); // The list is volatile to avoid reading a new allocated reference before the c'tor is executed private volatile LinkedList readOnlyCopy = new LinkedList<>(); - /** - *
-   * Version is volatile to ensure it is atomically read when not using a lock.
-   * To indicate whether the suffix of pipleline changes:
-   * 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
-   *   added at Head, {@link #version} not change.
-   * 2.for {@link CompactionPipeline#swap},{@link #version} increase.
-   * 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
-   * 
- */ + // Version is volatile to ensure it is atomically read when not using a lock private volatile long version = 0; public CompactionPipeline(RegionServicesForStores region) { @@ -105,7 +95,7 @@ public class CompactionPipeline { public VersionedSegmentsList getVersionedTail() { synchronized (pipeline){ - ArrayList segmentList = new ArrayList<>(); + List segmentList = new ArrayList<>(); if(!pipeline.isEmpty()) { segmentList.add(0, pipeline.getLast()); } @@ -300,15 +290,10 @@ public class CompactionPipeline { return memStoreSizing.getMemStoreSize(); } - /** - * Must be called under the {@link CompactionPipeline#pipeline} Lock. - */ private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { - matchAndRemoveSuffixFromPipeline(suffix); - if (segment != null) { - pipeline.addLast(segment); - } + pipeline.removeAll(suffix); + if(segment != null) pipeline.addLast(segment); // During index merge we won't be closing the segments undergoing the merge. Segment#close() // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data @@ -322,53 +307,11 @@ public class CompactionPipeline { } } - /** - * Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in - * {@link CompactionPipeline#pipeline} one by one from the last element to the first element of - * suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}.
- * Must be called under the {@link CompactionPipeline#pipeline} Lock. - */ - private void matchAndRemoveSuffixFromPipeline(List suffix) { - if (suffix.isEmpty()) { - return; - } - if (pipeline.size() < suffix.size()) { - throw new IllegalStateException( - "CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size() - + "],pipeline size must greater than or equals suffix size"); - } - - ListIterator suffixIterator = suffix.listIterator(suffix.size()); - ListIterator pipelineIterator = pipeline.listIterator(pipeline.size()); - int count = 0; - while (suffixIterator.hasPrevious()) { - Segment suffixSegment = suffixIterator.previous(); - Segment pipelineSegment = pipelineIterator.previous(); - if (suffixSegment != pipelineSegment) { - throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment - + " is not pipleline segment:[" + pipelineSegment + "]"); - } - count++; - } - - for (int index = 1; index <= count; index++) { - pipeline.pollLast(); - } - - } - // replacing one segment in the pipeline with a new one exactly at the same index // need to be called only within synchronized block - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", - justification = "replaceAtIndex is invoked under a synchronize block so safe") private void replaceAtIndex(int idx, ImmutableSegment newSegment) { pipeline.set(idx, newSegment); readOnlyCopy = new LinkedList<>(pipeline); - // the version increment is indeed needed, because the swap uses removeAll() method of the - // linked-list that compares the objects to find what to remove. - // The flattening changes the segment object completely (creation pattern) and so - // swap will not proceed correctly after concurrent flattening. - version++; } public Segment getTail() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 577cbcc2463..a8ad4edc289 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -97,7 +97,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; -import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; @@ -1939,225 +1938,6 @@ public class TestHStore { } } - /** - *
-    * This test is for HBASE-26384,
-   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
-   * execute concurrently.
-   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
-   * for both branch-2 and master):
-   * 1. The {@link CompactingMemStore} size exceeds
-   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
-   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
-   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
-   * 2. The in memory compact thread starts and then stopping before
-   *    {@link CompactingMemStore#flattenOneSegment}.
-   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
-   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
-   *    compact thread continues.
-   *    Assuming {@link VersionedSegmentsList#version} returned from
-   *    {@link CompactingMemStore#getImmutableSegments} is v.
-   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
-   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
-   *    {@link CompactionPipeline#version} is still v.
-   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
-   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
-   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
-   *    {@link CompactionPipeline} has changed because
-   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
-   *    removed in fact and still remaining in {@link CompactionPipeline}.
-   *
-   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
-   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
-   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
-   *    v+1.
-   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
-   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
-   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
-   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
-   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
-   * 
- */ - @Test - public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { - Configuration conf = HBaseConfiguration.create(); - - byte[] smallValue = new byte[3]; - byte[] largeValue = new byte[9]; - final long timestamp = EnvironmentEdgeManager.currentTime(); - final long seqId = 100; - final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); - final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); - int smallCellByteSize = MutableSegment.getCellLength(smallCell); - int largeCellByteSize = MutableSegment.getCellLength(largeCell); - int totalCellByteSize = (smallCellByteSize + largeCellByteSize); - int flushByteSize = totalCellByteSize - 2; - - // set CompactingMemStore.inmemoryFlushSize to flushByteSize. - conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); - conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); - conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); - - init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) - .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); - - MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); - assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); - - store.add(smallCell, new NonThreadSafeMemStoreSizing()); - store.add(largeCell, new NonThreadSafeMemStoreSizing()); - - String oldThreadName = Thread.currentThread().getName(); - try { - Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); - /** - * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters - * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} - * would invoke {@link CompactingMemStore#stopCompaction}. - */ - myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); - - MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); - myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); - - assertTrue(memStoreSnapshot.getCellsCount() == 2); - assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); - VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); - assertTrue(segments.getNumOfSegments() == 0); - assertTrue(segments.getNumOfCells() == 0); - assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); - assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); - } finally { - Thread.currentThread().setName(oldThreadName); - } - } - - /** - *
-   * This test is for HBASE-26384,
-   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
-   * and writeMemStore execute concurrently.
-   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
-   * for both branch-2 and master):
-   * 1. The {@link CompactingMemStore} size exceeds
-   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
-   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
-   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
-   * 2. The in memory compact thread starts and then stopping before
-   *    {@link CompactingMemStore#flattenOneSegment}.
-   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
-   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
-   *    compact thread continues.
-   *    Assuming {@link VersionedSegmentsList#version} returned from
-   *    {@link CompactingMemStore#getImmutableSegments} is v.
-   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
-   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
-   *    {@link CompactionPipeline#version} is still v.
-   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
-   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
-   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
-   *    {@link CompactionPipeline} has changed because
-   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
-   *    removed in fact and still remaining in {@link CompactionPipeline}.
-   *
-   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
-   * and I add step 7-8 to test there is new segment added before retry.
-   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
-   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
-   *     v+1.
-   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
-   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
-   *    failed and retry,{@link VersionedSegmentsList#version} returned from
-   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
-   * 7. The write thread continues writing to {@link CompactingMemStore} and
-   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
-   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
-   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
-   *    {@link CompactionPipeline#version} is still v+1.
-   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
-   *    {@link CompactionPipeline#version} is still v+1,
-   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
-   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
-   *    {@link CompactingMemStore#swapPipelineWithNull}.
-   * 
- */ - @Test - public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { - Configuration conf = HBaseConfiguration.create(); - - byte[] smallValue = new byte[3]; - byte[] largeValue = new byte[9]; - final long timestamp = EnvironmentEdgeManager.currentTime(); - final long seqId = 100; - final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); - final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); - int smallCellByteSize = MutableSegment.getCellLength(smallCell); - int largeCellByteSize = MutableSegment.getCellLength(largeCell); - int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); - int flushByteSize = firstWriteCellByteSize - 2; - - // set CompactingMemStore.inmemoryFlushSize to flushByteSize. - conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); - conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); - conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); - - init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) - .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); - - final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); - assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); - - store.add(smallCell, new NonThreadSafeMemStoreSizing()); - store.add(largeCell, new NonThreadSafeMemStoreSizing()); - - final AtomicReference exceptionRef = new AtomicReference(); - final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); - final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); - final int writeAgainCellByteSize = MutableSegment.getCellLength(writeAgainCell1) - + MutableSegment.getCellLength(writeAgainCell2); - final Thread writeAgainThread = new Thread(() -> { - try { - myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); - - store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); - store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); - - myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); - } catch (Throwable exception) { - exceptionRef.set(exception); - } - }); - writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); - writeAgainThread.start(); - - String oldThreadName = Thread.currentThread().getName(); - try { - Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); - /** - * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters - * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} - * would invoke {@link CompactingMemStore#stopCompaction}. - */ - myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); - MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); - myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); - writeAgainThread.join(); - - assertTrue(memStoreSnapshot.getCellsCount() == 2); - assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); - VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); - assertTrue(segments.getNumOfSegments() == 1); - assertTrue( - ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); - assertTrue(segments.getNumOfCells() == 2); - assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); - assertTrue(exceptionRef.get() == null); - assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); - } finally { - Thread.currentThread().setName(oldThreadName); - } - } - private HStoreFile mockStoreFileWithLength(long length) { HStoreFile sf = mock(HStoreFile.class); StoreFileReader sfr = mock(StoreFileReader.class); @@ -2523,293 +2303,4 @@ public class TestHStore { } } - - public static class MyCompactingMemStore4 extends CompactingMemStore { - private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; - /** - * {@link CompactingMemStore#flattenOneSegment} must execute after - * {@link CompactingMemStore#getImmutableSegments} - */ - private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); - /** - * Only after {@link CompactingMemStore#flattenOneSegment} completed, - * {@link CompactingMemStore#swapPipelineWithNull} could execute. - */ - private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); - /** - * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the - * snapshot thread starts {@link CompactingMemStore#snapshot},because - * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. - */ - private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); - /** - * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. - */ - private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); - private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); - private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); - private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); - private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); - - public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, - HStore store, RegionServicesForStores regionServices, - MemoryCompactionPolicy compactionPolicy) throws IOException { - super(conf, cellComparator, store, regionServices, compactionPolicy); - } - - @Override - public VersionedSegmentsList getImmutableSegments() { - VersionedSegmentsList result = super.getImmutableSegments(); - if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { - int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); - if (currentCount <= 1) { - try { - flattenOneSegmentPreCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - } - return result; - } - - @Override - protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { - if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { - int currentCount = swapPipelineWithNullCounter.incrementAndGet(); - if (currentCount <= 1) { - try { - flattenOneSegmentPostCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - } - boolean result = super.swapPipelineWithNull(segments); - if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { - int currentCount = swapPipelineWithNullCounter.get(); - if (currentCount <= 1) { - assertTrue(!result); - } - if (currentCount == 2) { - assertTrue(result); - } - } - return result; - - } - - @Override - public void flattenOneSegment(long requesterVersion, Action action) { - int currentCount = flattenOneSegmentCounter.incrementAndGet(); - if (currentCount <= 1) { - try { - /** - * {@link CompactingMemStore#snapshot} could start. - */ - snapShotStartCyclicCyclicBarrier.await(); - flattenOneSegmentPreCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - super.flattenOneSegment(requesterVersion, action); - if (currentCount <= 1) { - try { - flattenOneSegmentPostCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - } - - @Override - protected boolean setInMemoryCompactionFlag() { - boolean result = super.setInMemoryCompactionFlag(); - assertTrue(result); - setInMemoryCompactionFlagCounter.incrementAndGet(); - return result; - } - - @Override - void inMemoryCompaction() { - try { - super.inMemoryCompaction(); - } finally { - try { - inMemoryCompactionEndCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - - } - } - - } - - public static class MyCompactingMemStore5 extends CompactingMemStore { - private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; - private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; - /** - * {@link CompactingMemStore#flattenOneSegment} must execute after - * {@link CompactingMemStore#getImmutableSegments} - */ - private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); - /** - * Only after {@link CompactingMemStore#flattenOneSegment} completed, - * {@link CompactingMemStore#swapPipelineWithNull} could execute. - */ - private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); - /** - * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the - * snapshot thread starts {@link CompactingMemStore#snapshot},because - * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. - */ - private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); - /** - * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. - */ - private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); - private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); - private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); - private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); - private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); - /** - * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain - * thread could start. - */ - private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); - /** - * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the - * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would - * execute,and in memory compact thread would exit,because we expect that in memory compact - * executing only once. - */ - private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); - - public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, - HStore store, RegionServicesForStores regionServices, - MemoryCompactionPolicy compactionPolicy) throws IOException { - super(conf, cellComparator, store, regionServices, compactionPolicy); - } - - @Override - public VersionedSegmentsList getImmutableSegments() { - VersionedSegmentsList result = super.getImmutableSegments(); - if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { - int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); - if (currentCount <= 1) { - try { - flattenOneSegmentPreCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - } - - return result; - } - - @Override - protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { - if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { - int currentCount = swapPipelineWithNullCounter.incrementAndGet(); - if (currentCount <= 1) { - try { - flattenOneSegmentPostCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - if (currentCount == 2) { - try { - /** - * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, - * writeAgain thread could start. - */ - writeMemStoreAgainStartCyclicBarrier.await(); - /** - * Only the writeAgain thread completes, retry - * {@link CompactingMemStore#swapPipelineWithNull} would execute. - */ - writeMemStoreAgainEndCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - } - boolean result = super.swapPipelineWithNull(segments); - if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { - int currentCount = swapPipelineWithNullCounter.get(); - if (currentCount <= 1) { - assertTrue(!result); - } - if (currentCount == 2) { - assertTrue(result); - } - } - return result; - - } - - @Override - public void flattenOneSegment(long requesterVersion, Action action) { - int currentCount = flattenOneSegmentCounter.incrementAndGet(); - if (currentCount <= 1) { - try { - /** - * {@link CompactingMemStore#snapshot} could start. - */ - snapShotStartCyclicCyclicBarrier.await(); - flattenOneSegmentPreCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - super.flattenOneSegment(requesterVersion, action); - if (currentCount <= 1) { - try { - flattenOneSegmentPostCyclicBarrier.await(); - /** - * Only the writeAgain thread completes, in memory compact thread would exit,because we - * expect that in memory compact executing only once. - */ - writeMemStoreAgainEndCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - - } - } - - @Override - protected boolean setInMemoryCompactionFlag() { - boolean result = super.setInMemoryCompactionFlag(); - int count = setInMemoryCompactionFlagCounter.incrementAndGet(); - if (count <= 1) { - assertTrue(result); - } - if (count == 2) { - assertTrue(!result); - } - return result; - } - - @Override - void inMemoryCompaction() { - try { - super.inMemoryCompaction(); - } finally { - try { - inMemoryCompactionEndCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - - } - } - } }