HBASE-26384 Segment already flushed to hfile may still be remained in CompactingMemStore (#3779)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
chenglei 2021-11-01 23:05:57 +08:00 committed by Andrew Purtell
parent 563a404ac1
commit 230ce3e6b5
3 changed files with 576 additions and 6 deletions

View File

@ -567,12 +567,12 @@ public class CompactingMemStore extends AbstractMemStore {
boolean done = false;
while (!done) {
iterationsCnt++;
VersionedSegmentsList segments = pipeline.getVersionedList();
VersionedSegmentsList segments = getImmutableSegments();
pushToSnapshot(segments.getStoreSegments());
// swap can return false in case the pipeline was updated by ongoing compaction
// and the version increase, the chance of it happenning is very low
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
done = pipeline.swap(segments, null, false, false);
done = swapPipelineWithNull(segments);
if (iterationsCnt>2) {
// practically it is impossible that this loop iterates more than two times
// (because the compaction is stopped and none restarts it while in snapshot request),
@ -585,6 +585,10 @@ public class CompactingMemStore extends AbstractMemStore {
}
}
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
return pipeline.swap(segments, null, false, false);
}
private void pushToSnapshot(List<ImmutableSegment> segments) {
if(segments.isEmpty()) return;
if(segments.size() == 1 && !segments.get(0).isEmpty()) {

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -64,7 +65,16 @@ public class CompactionPipeline {
private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
// The list is volatile to avoid reading a new allocated reference before the c'tor is executed
private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
// Version is volatile to ensure it is atomically read when not using a lock
/**
* <pre>
* Version is volatile to ensure it is atomically read when not using a lock.
* To indicate whether the suffix of pipleline changes
* 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
* added at Head, {@link #version} not change.
* 2.for {@link CompactionPipeline#swap},{@link #version} increase.
* 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
* </pre>
*/
private volatile long version = 0;
public CompactionPipeline(RegionServicesForStores region) {
@ -95,7 +105,7 @@ public class CompactionPipeline {
public VersionedSegmentsList getVersionedTail() {
synchronized (pipeline){
List<ImmutableSegment> segmentList = new ArrayList<>();
ArrayList<ImmutableSegment> segmentList = new ArrayList<>();
if(!pipeline.isEmpty()) {
segmentList.add(0, pipeline.getLast());
}
@ -290,10 +300,15 @@ public class CompactionPipeline {
return memStoreSizing.getMemStoreSize();
}
/**
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
*/
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
pipeline.removeAll(suffix);
if(segment != null) pipeline.addLast(segment);
matchAndRemoveSuffixFromPipeline(suffix);
if (segment != null) {
pipeline.addLast(segment);
}
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
@ -307,11 +322,53 @@ public class CompactionPipeline {
}
}
/**
* Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in
* {@link CompactionPipeline#pipeline} one by one from the last element to the first element of
* suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/>
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
*/
private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
if (suffix.isEmpty()) {
return;
}
if (pipeline.size() < suffix.size()) {
throw new IllegalStateException(
"CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
+ "],pipeline size must greater than or equals suffix size");
}
ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
int count = 0;
while (suffixIterator.hasPrevious()) {
Segment suffixSegment = suffixIterator.previous();
Segment pipelineSegment = pipelineIterator.previous();
if (suffixSegment != pipelineSegment) {
throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
+ " is not pipleline segment:[" + pipelineSegment + "]");
}
count++;
}
for (int index = 1; index <= count; index++) {
pipeline.pollLast();
}
}
// replacing one segment in the pipeline with a new one exactly at the same index
// need to be called only within synchronized block
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
justification = "replaceAtIndex is invoked under a synchronize block so safe")
private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
pipeline.set(idx, newSegment);
readOnlyCopy = new LinkedList<>(pipeline);
// the version increment is indeed needed, because the swap uses removeAll() method of the
// linked-list that compares the objects to find what to remove.
// The flattening changes the segment object completely (creation pattern) and so
// swap will not proceed correctly after concurrent flattening.
version++;
}
public Segment getTail() {

View File

@ -97,6 +97,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
@ -1938,6 +1939,225 @@ public class TestHStore {
}
}
/**
* <pre>
* This test is for HBASE-26384,
* test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
* execute concurrently.
* The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
* for both branch-2 and master):
* 1. The {@link CompactingMemStore} size exceeds
* {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
* {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
* in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
* 2. The in memory compact thread starts and then stopping before
* {@link CompactingMemStore#flattenOneSegment}.
* 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
* snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
* compact thread continues.
* Assuming {@link VersionedSegmentsList#version} returned from
* {@link CompactingMemStore#getImmutableSegments} is v.
* 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
* {@link CompactionPipeline#version} is still v.
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
* thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
* {@link CompactionPipeline} has changed because
* {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
* removed in fact and still remaining in {@link CompactionPipeline}.
*
* After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
* {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
* v+1.
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
* failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
* again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
* {@link CompactingMemStore#swapPipelineWithNull} succeeds.
* </pre>
*/
@Test
public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
int flushByteSize = totalCellByteSize - 2;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
store.add(smallCell, new NonThreadSafeMemStoreSizing());
store.add(largeCell, new NonThreadSafeMemStoreSizing());
String oldThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
/**
* {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
* {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
* would invoke {@link CompactingMemStore#stopCompaction}.
*/
myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
assertTrue(memStoreSnapshot.getCellsCount() == 2);
assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
assertTrue(segments.getNumOfSegments() == 0);
assertTrue(segments.getNumOfCells() == 0);
assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
} finally {
Thread.currentThread().setName(oldThreadName);
}
}
/**
* <pre>
* This test is for HBASE-26384,
* test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
* and writeMemStore execute concurrently.
* The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
* for both branch-2 and master):
* 1. The {@link CompactingMemStore} size exceeds
* {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
* {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
* in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
* 2. The in memory compact thread starts and then stopping before
* {@link CompactingMemStore#flattenOneSegment}.
* 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
* snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
* compact thread continues.
* Assuming {@link VersionedSegmentsList#version} returned from
* {@link CompactingMemStore#getImmutableSegments} is v.
* 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
* {@link CompactionPipeline#version} is still v.
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
* thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
* {@link CompactionPipeline} has changed because
* {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
* removed in fact and still remaining in {@link CompactionPipeline}.
*
* After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
* and I add step 7-8 to test there is new segment added before retry.
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
* {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
* v+1.
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
* failed and retry,{@link VersionedSegmentsList#version} returned from
* {@link CompactingMemStore#getImmutableSegments} is v+1.
* 7. The write thread continues writing to {@link CompactingMemStore} and
* {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
* {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
* {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
* {@link CompactionPipeline#version} is still v+1.
* 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
* {@link CompactionPipeline#version} is still v+1,
* {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
* remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
* {@link CompactingMemStore#swapPipelineWithNull}.
* </pre>
*/
@Test
public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
int flushByteSize = firstWriteCellByteSize - 2;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
store.add(smallCell, new NonThreadSafeMemStoreSizing());
store.add(largeCell, new NonThreadSafeMemStoreSizing());
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
final int writeAgainCellByteSize = MutableSegment.getCellLength(writeAgainCell1)
+ MutableSegment.getCellLength(writeAgainCell2);
final Thread writeAgainThread = new Thread(() -> {
try {
myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
} catch (Throwable exception) {
exceptionRef.set(exception);
}
});
writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
writeAgainThread.start();
String oldThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
/**
* {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
* {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
* would invoke {@link CompactingMemStore#stopCompaction}.
*/
myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
writeAgainThread.join();
assertTrue(memStoreSnapshot.getCellsCount() == 2);
assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
assertTrue(segments.getNumOfSegments() == 1);
assertTrue(
((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
assertTrue(segments.getNumOfCells() == 2);
assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
assertTrue(exceptionRef.get() == null);
assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
} finally {
Thread.currentThread().setName(oldThreadName);
}
}
private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
@ -2303,4 +2523,293 @@ public class TestHStore {
}
}
public static class MyCompactingMemStore4 extends CompactingMemStore {
private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
/**
* {@link CompactingMemStore#flattenOneSegment} must execute after
* {@link CompactingMemStore#getImmutableSegments}
*/
private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
/**
* Only after {@link CompactingMemStore#flattenOneSegment} completed,
* {@link CompactingMemStore#swapPipelineWithNull} could execute.
*/
private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
/**
* Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
* snapshot thread starts {@link CompactingMemStore#snapshot},because
* {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
*/
private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
/**
* To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
*/
private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
HStore store, RegionServicesForStores regionServices,
MemoryCompactionPolicy compactionPolicy) throws IOException {
super(conf, cellComparator, store, regionServices, compactionPolicy);
}
@Override
public VersionedSegmentsList getImmutableSegments() {
VersionedSegmentsList result = super.getImmutableSegments();
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
if (currentCount <= 1) {
try {
flattenOneSegmentPreCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
return result;
}
@Override
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = swapPipelineWithNullCounter.incrementAndGet();
if (currentCount <= 1) {
try {
flattenOneSegmentPostCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
boolean result = super.swapPipelineWithNull(segments);
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = swapPipelineWithNullCounter.get();
if (currentCount <= 1) {
assertTrue(!result);
}
if (currentCount == 2) {
assertTrue(result);
}
}
return result;
}
@Override
public void flattenOneSegment(long requesterVersion, Action action) {
int currentCount = flattenOneSegmentCounter.incrementAndGet();
if (currentCount <= 1) {
try {
/**
* {@link CompactingMemStore#snapshot} could start.
*/
snapShotStartCyclicCyclicBarrier.await();
flattenOneSegmentPreCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
super.flattenOneSegment(requesterVersion, action);
if (currentCount <= 1) {
try {
flattenOneSegmentPostCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
@Override
protected boolean setInMemoryCompactionFlag() {
boolean result = super.setInMemoryCompactionFlag();
assertTrue(result);
setInMemoryCompactionFlagCounter.incrementAndGet();
return result;
}
@Override
void inMemoryCompaction() {
try {
super.inMemoryCompaction();
} finally {
try {
inMemoryCompactionEndCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
public static class MyCompactingMemStore5 extends CompactingMemStore {
private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
/**
* {@link CompactingMemStore#flattenOneSegment} must execute after
* {@link CompactingMemStore#getImmutableSegments}
*/
private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
/**
* Only after {@link CompactingMemStore#flattenOneSegment} completed,
* {@link CompactingMemStore#swapPipelineWithNull} could execute.
*/
private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
/**
* Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
* snapshot thread starts {@link CompactingMemStore#snapshot},because
* {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
*/
private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
/**
* To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
*/
private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
/**
* Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
* thread could start.
*/
private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
/**
* This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
* writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
* execute,and in memory compact thread would exit,because we expect that in memory compact
* executing only once.
*/
private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
HStore store, RegionServicesForStores regionServices,
MemoryCompactionPolicy compactionPolicy) throws IOException {
super(conf, cellComparator, store, regionServices, compactionPolicy);
}
@Override
public VersionedSegmentsList getImmutableSegments() {
VersionedSegmentsList result = super.getImmutableSegments();
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
if (currentCount <= 1) {
try {
flattenOneSegmentPreCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
return result;
}
@Override
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = swapPipelineWithNullCounter.incrementAndGet();
if (currentCount <= 1) {
try {
flattenOneSegmentPostCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
if (currentCount == 2) {
try {
/**
* Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
* writeAgain thread could start.
*/
writeMemStoreAgainStartCyclicBarrier.await();
/**
* Only the writeAgain thread completes, retry
* {@link CompactingMemStore#swapPipelineWithNull} would execute.
*/
writeMemStoreAgainEndCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
boolean result = super.swapPipelineWithNull(segments);
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
int currentCount = swapPipelineWithNullCounter.get();
if (currentCount <= 1) {
assertTrue(!result);
}
if (currentCount == 2) {
assertTrue(result);
}
}
return result;
}
@Override
public void flattenOneSegment(long requesterVersion, Action action) {
int currentCount = flattenOneSegmentCounter.incrementAndGet();
if (currentCount <= 1) {
try {
/**
* {@link CompactingMemStore#snapshot} could start.
*/
snapShotStartCyclicCyclicBarrier.await();
flattenOneSegmentPreCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
super.flattenOneSegment(requesterVersion, action);
if (currentCount <= 1) {
try {
flattenOneSegmentPostCyclicBarrier.await();
/**
* Only the writeAgain thread completes, in memory compact thread would exit,because we
* expect that in memory compact executing only once.
*/
writeMemStoreAgainEndCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
@Override
protected boolean setInMemoryCompactionFlag() {
boolean result = super.setInMemoryCompactionFlag();
int count = setInMemoryCompactionFlagCounter.incrementAndGet();
if (count <= 1) {
assertTrue(result);
}
if (count == 2) {
assertTrue(!result);
}
return result;
}
@Override
void inMemoryCompaction() {
try {
super.inMemoryCompaction();
} finally {
try {
inMemoryCompactionEndCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
}