HBASE-26384 Segment already flushed to hfile may still be remained in CompactingMemStore (#3777)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
ff11f1115f
commit
558ab925ed
|
@ -567,12 +567,12 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
boolean done = false;
|
||||
while (!done) {
|
||||
iterationsCnt++;
|
||||
VersionedSegmentsList segments = pipeline.getVersionedList();
|
||||
VersionedSegmentsList segments = getImmutableSegments();
|
||||
pushToSnapshot(segments.getStoreSegments());
|
||||
// swap can return false in case the pipeline was updated by ongoing compaction
|
||||
// and the version increase, the chance of it happenning is very low
|
||||
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
|
||||
done = pipeline.swap(segments, null, false, false);
|
||||
done = swapPipelineWithNull(segments);
|
||||
if (iterationsCnt>2) {
|
||||
// practically it is impossible that this loop iterates more than two times
|
||||
// (because the compaction is stopped and none restarts it while in snapshot request),
|
||||
|
@ -585,6 +585,10 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
}
|
||||
}
|
||||
|
||||
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
|
||||
return pipeline.swap(segments, null, false, false);
|
||||
}
|
||||
|
||||
private void pushToSnapshot(List<ImmutableSegment> segments) {
|
||||
if(segments.isEmpty()) return;
|
||||
if(segments.size() == 1 && !segments.get(0).isEmpty()) {
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.ArrayList;
|
|||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -64,7 +65,16 @@ public class CompactionPipeline {
|
|||
private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
|
||||
// The list is volatile to avoid reading a new allocated reference before the c'tor is executed
|
||||
private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
|
||||
// Version is volatile to ensure it is atomically read when not using a lock
|
||||
/**
|
||||
* <pre>
|
||||
* Version is volatile to ensure it is atomically read when not using a lock.
|
||||
* To indicate whether the suffix of pipleline changes:
|
||||
* 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
|
||||
* added at Head, {@link #version} not change.
|
||||
* 2.for {@link CompactionPipeline#swap},{@link #version} increase.
|
||||
* 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
|
||||
* </pre>
|
||||
*/
|
||||
private volatile long version = 0;
|
||||
|
||||
public CompactionPipeline(RegionServicesForStores region) {
|
||||
|
@ -290,10 +300,15 @@ public class CompactionPipeline {
|
|||
return memStoreSizing.getMemStoreSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
|
||||
*/
|
||||
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
|
||||
boolean closeSegmentsInSuffix) {
|
||||
pipeline.removeAll(suffix);
|
||||
if(segment != null) pipeline.addLast(segment);
|
||||
matchAndRemoveSuffixFromPipeline(suffix);
|
||||
if (segment != null) {
|
||||
pipeline.addLast(segment);
|
||||
}
|
||||
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
|
||||
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
|
||||
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
|
||||
|
@ -307,6 +322,41 @@ public class CompactionPipeline {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in
|
||||
* {@link CompactionPipeline#pipeline} one by one from the last element to the first element of
|
||||
* suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/>
|
||||
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
|
||||
*/
|
||||
private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
|
||||
if (suffix.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
if (pipeline.size() < suffix.size()) {
|
||||
throw new IllegalStateException(
|
||||
"CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
|
||||
+ "],pipeline size must greater than or equals suffix size");
|
||||
}
|
||||
|
||||
ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
|
||||
ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
|
||||
int count = 0;
|
||||
while (suffixIterator.hasPrevious()) {
|
||||
Segment suffixSegment = suffixIterator.previous();
|
||||
Segment pipelineSegment = pipelineIterator.previous();
|
||||
if (suffixSegment != pipelineSegment) {
|
||||
throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
|
||||
+ " is not pipleline segment:[" + pipelineSegment + "]");
|
||||
}
|
||||
count++;
|
||||
}
|
||||
|
||||
for (int index = 1; index <= count; index++) {
|
||||
pipeline.pollLast();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// replacing one segment in the pipeline with a new one exactly at the same index
|
||||
// need to be called only within synchronized block
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
|
||||
|
|
|
@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||
|
@ -1921,6 +1922,225 @@ public class TestHStore {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <pre>
|
||||
* This test is for HBASE-26384,
|
||||
* test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
|
||||
* execute concurrently.
|
||||
* The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
|
||||
* for both branch-2 and master):
|
||||
* 1. The {@link CompactingMemStore} size exceeds
|
||||
* {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
|
||||
* {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
|
||||
* in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
|
||||
* 2. The in memory compact thread starts and then stopping before
|
||||
* {@link CompactingMemStore#flattenOneSegment}.
|
||||
* 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
|
||||
* snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
|
||||
* compact thread continues.
|
||||
* Assuming {@link VersionedSegmentsList#version} returned from
|
||||
* {@link CompactingMemStore#getImmutableSegments} is v.
|
||||
* 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
|
||||
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
|
||||
* {@link CompactionPipeline#version} is still v.
|
||||
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
||||
* {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
|
||||
* thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
|
||||
* {@link CompactionPipeline} has changed because
|
||||
* {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
|
||||
* removed in fact and still remaining in {@link CompactionPipeline}.
|
||||
*
|
||||
* After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
|
||||
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
|
||||
* {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
|
||||
* v+1.
|
||||
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
||||
* {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
|
||||
* failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
|
||||
* again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
|
||||
* {@link CompactingMemStore#swapPipelineWithNull} succeeds.
|
||||
* </pre>
|
||||
*/
|
||||
@Test
|
||||
public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
||||
byte[] smallValue = new byte[3];
|
||||
byte[] largeValue = new byte[9];
|
||||
final long timestamp = EnvironmentEdgeManager.currentTime();
|
||||
final long seqId = 100;
|
||||
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
|
||||
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
|
||||
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
|
||||
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
|
||||
int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
|
||||
int flushByteSize = totalCellByteSize - 2;
|
||||
|
||||
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
|
||||
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
|
||||
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
|
||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
|
||||
|
||||
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
||||
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
||||
|
||||
MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
|
||||
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
|
||||
|
||||
store.add(smallCell, new NonThreadSafeMemStoreSizing());
|
||||
store.add(largeCell, new NonThreadSafeMemStoreSizing());
|
||||
|
||||
String oldThreadName = Thread.currentThread().getName();
|
||||
try {
|
||||
Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
|
||||
/**
|
||||
* {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
|
||||
* {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
|
||||
* would invoke {@link CompactingMemStore#stopCompaction}.
|
||||
*/
|
||||
myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
|
||||
|
||||
MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
|
||||
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
|
||||
|
||||
assertTrue(memStoreSnapshot.getCellsCount() == 2);
|
||||
assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
|
||||
VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
|
||||
assertTrue(segments.getNumOfSegments() == 0);
|
||||
assertTrue(segments.getNumOfCells() == 0);
|
||||
assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
|
||||
assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
|
||||
} finally {
|
||||
Thread.currentThread().setName(oldThreadName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <pre>
|
||||
* This test is for HBASE-26384,
|
||||
* test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
|
||||
* and writeMemStore execute concurrently.
|
||||
* The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
|
||||
* for both branch-2 and master):
|
||||
* 1. The {@link CompactingMemStore} size exceeds
|
||||
* {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
|
||||
* {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
|
||||
* in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
|
||||
* 2. The in memory compact thread starts and then stopping before
|
||||
* {@link CompactingMemStore#flattenOneSegment}.
|
||||
* 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
|
||||
* snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
|
||||
* compact thread continues.
|
||||
* Assuming {@link VersionedSegmentsList#version} returned from
|
||||
* {@link CompactingMemStore#getImmutableSegments} is v.
|
||||
* 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
|
||||
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
|
||||
* {@link CompactionPipeline#version} is still v.
|
||||
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
||||
* {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
|
||||
* thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
|
||||
* {@link CompactionPipeline} has changed because
|
||||
* {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
|
||||
* removed in fact and still remaining in {@link CompactionPipeline}.
|
||||
*
|
||||
* After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
|
||||
* and I add step 7-8 to test there is new segment added before retry.
|
||||
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
|
||||
* {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
|
||||
* v+1.
|
||||
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
||||
* {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
|
||||
* failed and retry,{@link VersionedSegmentsList#version} returned from
|
||||
* {@link CompactingMemStore#getImmutableSegments} is v+1.
|
||||
* 7. The write thread continues writing to {@link CompactingMemStore} and
|
||||
* {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
|
||||
* {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
|
||||
* {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
|
||||
* {@link CompactionPipeline#version} is still v+1.
|
||||
* 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
||||
* {@link CompactionPipeline#version} is still v+1,
|
||||
* {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
|
||||
* remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
|
||||
* {@link CompactingMemStore#swapPipelineWithNull}.
|
||||
* </pre>
|
||||
*/
|
||||
@Test
|
||||
public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
||||
byte[] smallValue = new byte[3];
|
||||
byte[] largeValue = new byte[9];
|
||||
final long timestamp = EnvironmentEdgeManager.currentTime();
|
||||
final long seqId = 100;
|
||||
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
|
||||
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
|
||||
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
|
||||
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
|
||||
int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
|
||||
int flushByteSize = firstWriteCellByteSize - 2;
|
||||
|
||||
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
|
||||
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
|
||||
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
|
||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
|
||||
|
||||
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
||||
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
||||
|
||||
final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
|
||||
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
|
||||
|
||||
store.add(smallCell, new NonThreadSafeMemStoreSizing());
|
||||
store.add(largeCell, new NonThreadSafeMemStoreSizing());
|
||||
|
||||
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
|
||||
final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
|
||||
final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
|
||||
final int writeAgainCellByteSize = MutableSegment.getCellLength(writeAgainCell1)
|
||||
+ MutableSegment.getCellLength(writeAgainCell2);
|
||||
final Thread writeAgainThread = new Thread(() -> {
|
||||
try {
|
||||
myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
|
||||
|
||||
store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
|
||||
store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
|
||||
|
||||
myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
|
||||
} catch (Throwable exception) {
|
||||
exceptionRef.set(exception);
|
||||
}
|
||||
});
|
||||
writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
|
||||
writeAgainThread.start();
|
||||
|
||||
String oldThreadName = Thread.currentThread().getName();
|
||||
try {
|
||||
Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
|
||||
/**
|
||||
* {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
|
||||
* {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
|
||||
* would invoke {@link CompactingMemStore#stopCompaction}.
|
||||
*/
|
||||
myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
|
||||
MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
|
||||
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
|
||||
writeAgainThread.join();
|
||||
|
||||
assertTrue(memStoreSnapshot.getCellsCount() == 2);
|
||||
assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
|
||||
VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
|
||||
assertTrue(segments.getNumOfSegments() == 1);
|
||||
assertTrue(
|
||||
((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
|
||||
assertTrue(segments.getNumOfCells() == 2);
|
||||
assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
|
||||
assertTrue(exceptionRef.get() == null);
|
||||
assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
|
||||
} finally {
|
||||
Thread.currentThread().setName(oldThreadName);
|
||||
}
|
||||
}
|
||||
|
||||
private HStoreFile mockStoreFileWithLength(long length) {
|
||||
HStoreFile sf = mock(HStoreFile.class);
|
||||
StoreFileReader sfr = mock(StoreFileReader.class);
|
||||
|
@ -2286,4 +2506,293 @@ public class TestHStore {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
public static class MyCompactingMemStore4 extends CompactingMemStore {
|
||||
private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
|
||||
/**
|
||||
* {@link CompactingMemStore#flattenOneSegment} must execute after
|
||||
* {@link CompactingMemStore#getImmutableSegments}
|
||||
*/
|
||||
private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
|
||||
/**
|
||||
* Only after {@link CompactingMemStore#flattenOneSegment} completed,
|
||||
* {@link CompactingMemStore#swapPipelineWithNull} could execute.
|
||||
*/
|
||||
private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
|
||||
/**
|
||||
* Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
|
||||
* snapshot thread starts {@link CompactingMemStore#snapshot},because
|
||||
* {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
|
||||
*/
|
||||
private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
|
||||
/**
|
||||
* To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
|
||||
*/
|
||||
private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
|
||||
private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
|
||||
|
||||
public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
|
||||
HStore store, RegionServicesForStores regionServices,
|
||||
MemoryCompactionPolicy compactionPolicy) throws IOException {
|
||||
super(conf, cellComparator, store, regionServices, compactionPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionedSegmentsList getImmutableSegments() {
|
||||
VersionedSegmentsList result = super.getImmutableSegments();
|
||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
||||
int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
|
||||
if (currentCount <= 1) {
|
||||
try {
|
||||
flattenOneSegmentPreCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
|
||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
||||
int currentCount = swapPipelineWithNullCounter.incrementAndGet();
|
||||
if (currentCount <= 1) {
|
||||
try {
|
||||
flattenOneSegmentPostCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
boolean result = super.swapPipelineWithNull(segments);
|
||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
||||
int currentCount = swapPipelineWithNullCounter.get();
|
||||
if (currentCount <= 1) {
|
||||
assertTrue(!result);
|
||||
}
|
||||
if (currentCount == 2) {
|
||||
assertTrue(result);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flattenOneSegment(long requesterVersion, Action action) {
|
||||
int currentCount = flattenOneSegmentCounter.incrementAndGet();
|
||||
if (currentCount <= 1) {
|
||||
try {
|
||||
/**
|
||||
* {@link CompactingMemStore#snapshot} could start.
|
||||
*/
|
||||
snapShotStartCyclicCyclicBarrier.await();
|
||||
flattenOneSegmentPreCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
super.flattenOneSegment(requesterVersion, action);
|
||||
if (currentCount <= 1) {
|
||||
try {
|
||||
flattenOneSegmentPostCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean setInMemoryCompactionFlag() {
|
||||
boolean result = super.setInMemoryCompactionFlag();
|
||||
assertTrue(result);
|
||||
setInMemoryCompactionFlagCounter.incrementAndGet();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
void inMemoryCompaction() {
|
||||
try {
|
||||
super.inMemoryCompaction();
|
||||
} finally {
|
||||
try {
|
||||
inMemoryCompactionEndCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class MyCompactingMemStore5 extends CompactingMemStore {
|
||||
private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
|
||||
private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
|
||||
/**
|
||||
* {@link CompactingMemStore#flattenOneSegment} must execute after
|
||||
* {@link CompactingMemStore#getImmutableSegments}
|
||||
*/
|
||||
private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
|
||||
/**
|
||||
* Only after {@link CompactingMemStore#flattenOneSegment} completed,
|
||||
* {@link CompactingMemStore#swapPipelineWithNull} could execute.
|
||||
*/
|
||||
private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
|
||||
/**
|
||||
* Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
|
||||
* snapshot thread starts {@link CompactingMemStore#snapshot},because
|
||||
* {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
|
||||
*/
|
||||
private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
|
||||
/**
|
||||
* To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
|
||||
*/
|
||||
private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
|
||||
private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
|
||||
/**
|
||||
* Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
|
||||
* thread could start.
|
||||
*/
|
||||
private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
|
||||
/**
|
||||
* This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
|
||||
* writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
|
||||
* execute,and in memory compact thread would exit,because we expect that in memory compact
|
||||
* executing only once.
|
||||
*/
|
||||
private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
|
||||
|
||||
public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
|
||||
HStore store, RegionServicesForStores regionServices,
|
||||
MemoryCompactionPolicy compactionPolicy) throws IOException {
|
||||
super(conf, cellComparator, store, regionServices, compactionPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionedSegmentsList getImmutableSegments() {
|
||||
VersionedSegmentsList result = super.getImmutableSegments();
|
||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
||||
int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
|
||||
if (currentCount <= 1) {
|
||||
try {
|
||||
flattenOneSegmentPreCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
|
||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
||||
int currentCount = swapPipelineWithNullCounter.incrementAndGet();
|
||||
if (currentCount <= 1) {
|
||||
try {
|
||||
flattenOneSegmentPostCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (currentCount == 2) {
|
||||
try {
|
||||
/**
|
||||
* Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
|
||||
* writeAgain thread could start.
|
||||
*/
|
||||
writeMemStoreAgainStartCyclicBarrier.await();
|
||||
/**
|
||||
* Only the writeAgain thread completes, retry
|
||||
* {@link CompactingMemStore#swapPipelineWithNull} would execute.
|
||||
*/
|
||||
writeMemStoreAgainEndCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
boolean result = super.swapPipelineWithNull(segments);
|
||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
||||
int currentCount = swapPipelineWithNullCounter.get();
|
||||
if (currentCount <= 1) {
|
||||
assertTrue(!result);
|
||||
}
|
||||
if (currentCount == 2) {
|
||||
assertTrue(result);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flattenOneSegment(long requesterVersion, Action action) {
|
||||
int currentCount = flattenOneSegmentCounter.incrementAndGet();
|
||||
if (currentCount <= 1) {
|
||||
try {
|
||||
/**
|
||||
* {@link CompactingMemStore#snapshot} could start.
|
||||
*/
|
||||
snapShotStartCyclicCyclicBarrier.await();
|
||||
flattenOneSegmentPreCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
super.flattenOneSegment(requesterVersion, action);
|
||||
if (currentCount <= 1) {
|
||||
try {
|
||||
flattenOneSegmentPostCyclicBarrier.await();
|
||||
/**
|
||||
* Only the writeAgain thread completes, in memory compact thread would exit,because we
|
||||
* expect that in memory compact executing only once.
|
||||
*/
|
||||
writeMemStoreAgainEndCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean setInMemoryCompactionFlag() {
|
||||
boolean result = super.setInMemoryCompactionFlag();
|
||||
int count = setInMemoryCompactionFlagCounter.incrementAndGet();
|
||||
if (count <= 1) {
|
||||
assertTrue(result);
|
||||
}
|
||||
if (count == 2) {
|
||||
assertTrue(!result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
void inMemoryCompaction() {
|
||||
try {
|
||||
super.inMemoryCompaction();
|
||||
} finally {
|
||||
try {
|
||||
inMemoryCompactionEndCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue