Revert "HBASE-26384 Segment already flushed to hfile may still be remained in CompactingMemStore (#3779)"
This reverts commit 98259c679a
.
This commit is contained in:
parent
6057265388
commit
9ccbd8c668
|
@ -567,12 +567,12 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
while (!done) {
|
while (!done) {
|
||||||
iterationsCnt++;
|
iterationsCnt++;
|
||||||
VersionedSegmentsList segments = getImmutableSegments();
|
VersionedSegmentsList segments = pipeline.getVersionedList();
|
||||||
pushToSnapshot(segments.getStoreSegments());
|
pushToSnapshot(segments.getStoreSegments());
|
||||||
// swap can return false in case the pipeline was updated by ongoing compaction
|
// swap can return false in case the pipeline was updated by ongoing compaction
|
||||||
// and the version increase, the chance of it happenning is very low
|
// and the version increase, the chance of it happenning is very low
|
||||||
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
|
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
|
||||||
done = swapPipelineWithNull(segments);
|
done = pipeline.swap(segments, null, false, false);
|
||||||
if (iterationsCnt>2) {
|
if (iterationsCnt>2) {
|
||||||
// practically it is impossible that this loop iterates more than two times
|
// practically it is impossible that this loop iterates more than two times
|
||||||
// (because the compaction is stopped and none restarts it while in snapshot request),
|
// (because the compaction is stopped and none restarts it while in snapshot request),
|
||||||
|
@ -585,10 +585,6 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
|
|
||||||
return pipeline.swap(segments, null, false, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void pushToSnapshot(List<ImmutableSegment> segments) {
|
private void pushToSnapshot(List<ImmutableSegment> segments) {
|
||||||
if(segments.isEmpty()) return;
|
if(segments.isEmpty()) return;
|
||||||
if(segments.size() == 1 && !segments.get(0).isEmpty()) {
|
if(segments.size() == 1 && !segments.get(0).isEmpty()) {
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ListIterator;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
@ -65,16 +64,7 @@ public class CompactionPipeline {
|
||||||
private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
|
private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
|
||||||
// The list is volatile to avoid reading a new allocated reference before the c'tor is executed
|
// The list is volatile to avoid reading a new allocated reference before the c'tor is executed
|
||||||
private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
|
private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
|
||||||
/**
|
// Version is volatile to ensure it is atomically read when not using a lock
|
||||||
* <pre>
|
|
||||||
* Version is volatile to ensure it is atomically read when not using a lock.
|
|
||||||
* To indicate whether the suffix of pipleline changes:
|
|
||||||
* 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
|
|
||||||
* added at Head, {@link #version} not change.
|
|
||||||
* 2.for {@link CompactionPipeline#swap},{@link #version} increase.
|
|
||||||
* 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
private volatile long version = 0;
|
private volatile long version = 0;
|
||||||
|
|
||||||
public CompactionPipeline(RegionServicesForStores region) {
|
public CompactionPipeline(RegionServicesForStores region) {
|
||||||
|
@ -105,7 +95,7 @@ public class CompactionPipeline {
|
||||||
|
|
||||||
public VersionedSegmentsList getVersionedTail() {
|
public VersionedSegmentsList getVersionedTail() {
|
||||||
synchronized (pipeline){
|
synchronized (pipeline){
|
||||||
ArrayList<ImmutableSegment> segmentList = new ArrayList<>();
|
List<ImmutableSegment> segmentList = new ArrayList<>();
|
||||||
if(!pipeline.isEmpty()) {
|
if(!pipeline.isEmpty()) {
|
||||||
segmentList.add(0, pipeline.getLast());
|
segmentList.add(0, pipeline.getLast());
|
||||||
}
|
}
|
||||||
|
@ -300,15 +290,10 @@ public class CompactionPipeline {
|
||||||
return memStoreSizing.getMemStoreSize();
|
return memStoreSizing.getMemStoreSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
|
|
||||||
*/
|
|
||||||
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
|
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
|
||||||
boolean closeSegmentsInSuffix) {
|
boolean closeSegmentsInSuffix) {
|
||||||
matchAndRemoveSuffixFromPipeline(suffix);
|
pipeline.removeAll(suffix);
|
||||||
if (segment != null) {
|
if(segment != null) pipeline.addLast(segment);
|
||||||
pipeline.addLast(segment);
|
|
||||||
}
|
|
||||||
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
|
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
|
||||||
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
|
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
|
||||||
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
|
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
|
||||||
|
@ -322,53 +307,11 @@ public class CompactionPipeline {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in
|
|
||||||
* {@link CompactionPipeline#pipeline} one by one from the last element to the first element of
|
|
||||||
* suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/>
|
|
||||||
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
|
|
||||||
*/
|
|
||||||
private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
|
|
||||||
if (suffix.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (pipeline.size() < suffix.size()) {
|
|
||||||
throw new IllegalStateException(
|
|
||||||
"CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
|
|
||||||
+ "],pipeline size must greater than or equals suffix size");
|
|
||||||
}
|
|
||||||
|
|
||||||
ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
|
|
||||||
ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
|
|
||||||
int count = 0;
|
|
||||||
while (suffixIterator.hasPrevious()) {
|
|
||||||
Segment suffixSegment = suffixIterator.previous();
|
|
||||||
Segment pipelineSegment = pipelineIterator.previous();
|
|
||||||
if (suffixSegment != pipelineSegment) {
|
|
||||||
throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
|
|
||||||
+ " is not pipleline segment:[" + pipelineSegment + "]");
|
|
||||||
}
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int index = 1; index <= count; index++) {
|
|
||||||
pipeline.pollLast();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// replacing one segment in the pipeline with a new one exactly at the same index
|
// replacing one segment in the pipeline with a new one exactly at the same index
|
||||||
// need to be called only within synchronized block
|
// need to be called only within synchronized block
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
|
|
||||||
justification = "replaceAtIndex is invoked under a synchronize block so safe")
|
|
||||||
private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
|
private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
|
||||||
pipeline.set(idx, newSegment);
|
pipeline.set(idx, newSegment);
|
||||||
readOnlyCopy = new LinkedList<>(pipeline);
|
readOnlyCopy = new LinkedList<>(pipeline);
|
||||||
// the version increment is indeed needed, because the swap uses removeAll() method of the
|
|
||||||
// linked-list that compares the objects to find what to remove.
|
|
||||||
// The flattening changes the segment object completely (creation pattern) and so
|
|
||||||
// swap will not proceed correctly after concurrent flattening.
|
|
||||||
version++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Segment getTail() {
|
public Segment getTail() {
|
||||||
|
|
|
@ -97,7 +97,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
|
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||||
|
@ -1939,225 +1938,6 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* <pre>
|
|
||||||
* This test is for HBASE-26384,
|
|
||||||
* test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
|
|
||||||
* execute concurrently.
|
|
||||||
* The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
|
|
||||||
* for both branch-2 and master):
|
|
||||||
* 1. The {@link CompactingMemStore} size exceeds
|
|
||||||
* {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
|
|
||||||
* {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
|
|
||||||
* in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
|
|
||||||
* 2. The in memory compact thread starts and then stopping before
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment}.
|
|
||||||
* 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
|
|
||||||
* snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
|
|
||||||
* compact thread continues.
|
|
||||||
* Assuming {@link VersionedSegmentsList#version} returned from
|
|
||||||
* {@link CompactingMemStore#getImmutableSegments} is v.
|
|
||||||
* 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
|
|
||||||
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
|
|
||||||
* {@link CompactionPipeline#version} is still v.
|
|
||||||
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
|
||||||
* {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
|
|
||||||
* thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
|
|
||||||
* {@link CompactionPipeline} has changed because
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
|
|
||||||
* removed in fact and still remaining in {@link CompactionPipeline}.
|
|
||||||
*
|
|
||||||
* After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
|
|
||||||
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
|
|
||||||
* v+1.
|
|
||||||
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
|
||||||
* {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
|
|
||||||
* failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
|
|
||||||
* again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
|
|
||||||
* {@link CompactingMemStore#swapPipelineWithNull} succeeds.
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
|
|
||||||
byte[] smallValue = new byte[3];
|
|
||||||
byte[] largeValue = new byte[9];
|
|
||||||
final long timestamp = EnvironmentEdgeManager.currentTime();
|
|
||||||
final long seqId = 100;
|
|
||||||
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
|
|
||||||
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
|
|
||||||
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
|
|
||||||
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
|
|
||||||
int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
|
|
||||||
int flushByteSize = totalCellByteSize - 2;
|
|
||||||
|
|
||||||
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
|
|
||||||
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
|
|
||||||
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
|
|
||||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
|
|
||||||
|
|
||||||
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
|
||||||
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
|
||||||
|
|
||||||
MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
|
|
||||||
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
|
|
||||||
|
|
||||||
store.add(smallCell, new NonThreadSafeMemStoreSizing());
|
|
||||||
store.add(largeCell, new NonThreadSafeMemStoreSizing());
|
|
||||||
|
|
||||||
String oldThreadName = Thread.currentThread().getName();
|
|
||||||
try {
|
|
||||||
Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
|
|
||||||
/**
|
|
||||||
* {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
|
|
||||||
* would invoke {@link CompactingMemStore#stopCompaction}.
|
|
||||||
*/
|
|
||||||
myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
|
|
||||||
|
|
||||||
MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
|
|
||||||
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
|
|
||||||
|
|
||||||
assertTrue(memStoreSnapshot.getCellsCount() == 2);
|
|
||||||
assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
|
|
||||||
VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
|
|
||||||
assertTrue(segments.getNumOfSegments() == 0);
|
|
||||||
assertTrue(segments.getNumOfCells() == 0);
|
|
||||||
assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
|
|
||||||
assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
|
|
||||||
} finally {
|
|
||||||
Thread.currentThread().setName(oldThreadName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <pre>
|
|
||||||
* This test is for HBASE-26384,
|
|
||||||
* test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
|
|
||||||
* and writeMemStore execute concurrently.
|
|
||||||
* The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
|
|
||||||
* for both branch-2 and master):
|
|
||||||
* 1. The {@link CompactingMemStore} size exceeds
|
|
||||||
* {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
|
|
||||||
* {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
|
|
||||||
* in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
|
|
||||||
* 2. The in memory compact thread starts and then stopping before
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment}.
|
|
||||||
* 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
|
|
||||||
* snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
|
|
||||||
* compact thread continues.
|
|
||||||
* Assuming {@link VersionedSegmentsList#version} returned from
|
|
||||||
* {@link CompactingMemStore#getImmutableSegments} is v.
|
|
||||||
* 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
|
|
||||||
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
|
|
||||||
* {@link CompactionPipeline#version} is still v.
|
|
||||||
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
|
||||||
* {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
|
|
||||||
* thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
|
|
||||||
* {@link CompactionPipeline} has changed because
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
|
|
||||||
* removed in fact and still remaining in {@link CompactionPipeline}.
|
|
||||||
*
|
|
||||||
* After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
|
|
||||||
* and I add step 7-8 to test there is new segment added before retry.
|
|
||||||
* 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
|
|
||||||
* v+1.
|
|
||||||
* 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
|
||||||
* {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
|
|
||||||
* failed and retry,{@link VersionedSegmentsList#version} returned from
|
|
||||||
* {@link CompactingMemStore#getImmutableSegments} is v+1.
|
|
||||||
* 7. The write thread continues writing to {@link CompactingMemStore} and
|
|
||||||
* {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
|
|
||||||
* {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
|
|
||||||
* {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
|
|
||||||
* {@link CompactionPipeline#version} is still v+1.
|
|
||||||
* 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
|
|
||||||
* {@link CompactionPipeline#version} is still v+1,
|
|
||||||
* {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
|
|
||||||
* remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
|
|
||||||
* {@link CompactingMemStore#swapPipelineWithNull}.
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
|
|
||||||
byte[] smallValue = new byte[3];
|
|
||||||
byte[] largeValue = new byte[9];
|
|
||||||
final long timestamp = EnvironmentEdgeManager.currentTime();
|
|
||||||
final long seqId = 100;
|
|
||||||
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
|
|
||||||
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
|
|
||||||
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
|
|
||||||
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
|
|
||||||
int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
|
|
||||||
int flushByteSize = firstWriteCellByteSize - 2;
|
|
||||||
|
|
||||||
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
|
|
||||||
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
|
|
||||||
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
|
|
||||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
|
|
||||||
|
|
||||||
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
|
||||||
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
|
||||||
|
|
||||||
final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
|
|
||||||
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
|
|
||||||
|
|
||||||
store.add(smallCell, new NonThreadSafeMemStoreSizing());
|
|
||||||
store.add(largeCell, new NonThreadSafeMemStoreSizing());
|
|
||||||
|
|
||||||
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
|
|
||||||
final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
|
|
||||||
final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
|
|
||||||
final int writeAgainCellByteSize = MutableSegment.getCellLength(writeAgainCell1)
|
|
||||||
+ MutableSegment.getCellLength(writeAgainCell2);
|
|
||||||
final Thread writeAgainThread = new Thread(() -> {
|
|
||||||
try {
|
|
||||||
myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
|
|
||||||
|
|
||||||
store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
|
|
||||||
store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
|
|
||||||
|
|
||||||
myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
|
|
||||||
} catch (Throwable exception) {
|
|
||||||
exceptionRef.set(exception);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
|
|
||||||
writeAgainThread.start();
|
|
||||||
|
|
||||||
String oldThreadName = Thread.currentThread().getName();
|
|
||||||
try {
|
|
||||||
Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
|
|
||||||
/**
|
|
||||||
* {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
|
|
||||||
* would invoke {@link CompactingMemStore#stopCompaction}.
|
|
||||||
*/
|
|
||||||
myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
|
|
||||||
MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
|
|
||||||
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
|
|
||||||
writeAgainThread.join();
|
|
||||||
|
|
||||||
assertTrue(memStoreSnapshot.getCellsCount() == 2);
|
|
||||||
assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
|
|
||||||
VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
|
|
||||||
assertTrue(segments.getNumOfSegments() == 1);
|
|
||||||
assertTrue(
|
|
||||||
((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
|
|
||||||
assertTrue(segments.getNumOfCells() == 2);
|
|
||||||
assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
|
|
||||||
assertTrue(exceptionRef.get() == null);
|
|
||||||
assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
|
|
||||||
} finally {
|
|
||||||
Thread.currentThread().setName(oldThreadName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private HStoreFile mockStoreFileWithLength(long length) {
|
private HStoreFile mockStoreFileWithLength(long length) {
|
||||||
HStoreFile sf = mock(HStoreFile.class);
|
HStoreFile sf = mock(HStoreFile.class);
|
||||||
StoreFileReader sfr = mock(StoreFileReader.class);
|
StoreFileReader sfr = mock(StoreFileReader.class);
|
||||||
|
@ -2523,293 +2303,4 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class MyCompactingMemStore4 extends CompactingMemStore {
|
|
||||||
private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
|
|
||||||
/**
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment} must execute after
|
|
||||||
* {@link CompactingMemStore#getImmutableSegments}
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
|
|
||||||
/**
|
|
||||||
* Only after {@link CompactingMemStore#flattenOneSegment} completed,
|
|
||||||
* {@link CompactingMemStore#swapPipelineWithNull} could execute.
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
|
|
||||||
/**
|
|
||||||
* Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
|
|
||||||
* snapshot thread starts {@link CompactingMemStore#snapshot},because
|
|
||||||
* {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
|
|
||||||
/**
|
|
||||||
* To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
|
|
||||||
private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
|
|
||||||
private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
|
|
||||||
private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
|
|
||||||
private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
|
|
||||||
|
|
||||||
public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
|
|
||||||
HStore store, RegionServicesForStores regionServices,
|
|
||||||
MemoryCompactionPolicy compactionPolicy) throws IOException {
|
|
||||||
super(conf, cellComparator, store, regionServices, compactionPolicy);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VersionedSegmentsList getImmutableSegments() {
|
|
||||||
VersionedSegmentsList result = super.getImmutableSegments();
|
|
||||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
|
||||||
int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
try {
|
|
||||||
flattenOneSegmentPreCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
|
|
||||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
|
||||||
int currentCount = swapPipelineWithNullCounter.incrementAndGet();
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
try {
|
|
||||||
flattenOneSegmentPostCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
boolean result = super.swapPipelineWithNull(segments);
|
|
||||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
|
||||||
int currentCount = swapPipelineWithNullCounter.get();
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
assertTrue(!result);
|
|
||||||
}
|
|
||||||
if (currentCount == 2) {
|
|
||||||
assertTrue(result);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void flattenOneSegment(long requesterVersion, Action action) {
|
|
||||||
int currentCount = flattenOneSegmentCounter.incrementAndGet();
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
try {
|
|
||||||
/**
|
|
||||||
* {@link CompactingMemStore#snapshot} could start.
|
|
||||||
*/
|
|
||||||
snapShotStartCyclicCyclicBarrier.await();
|
|
||||||
flattenOneSegmentPreCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
super.flattenOneSegment(requesterVersion, action);
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
try {
|
|
||||||
flattenOneSegmentPostCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean setInMemoryCompactionFlag() {
|
|
||||||
boolean result = super.setInMemoryCompactionFlag();
|
|
||||||
assertTrue(result);
|
|
||||||
setInMemoryCompactionFlagCounter.incrementAndGet();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void inMemoryCompaction() {
|
|
||||||
try {
|
|
||||||
super.inMemoryCompaction();
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
inMemoryCompactionEndCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class MyCompactingMemStore5 extends CompactingMemStore {
|
|
||||||
private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
|
|
||||||
private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
|
|
||||||
/**
|
|
||||||
* {@link CompactingMemStore#flattenOneSegment} must execute after
|
|
||||||
* {@link CompactingMemStore#getImmutableSegments}
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
|
|
||||||
/**
|
|
||||||
* Only after {@link CompactingMemStore#flattenOneSegment} completed,
|
|
||||||
* {@link CompactingMemStore#swapPipelineWithNull} could execute.
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
|
|
||||||
/**
|
|
||||||
* Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
|
|
||||||
* snapshot thread starts {@link CompactingMemStore#snapshot},because
|
|
||||||
* {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
|
|
||||||
/**
|
|
||||||
* To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
|
|
||||||
private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
|
|
||||||
private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
|
|
||||||
private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
|
|
||||||
private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
|
|
||||||
/**
|
|
||||||
* Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
|
|
||||||
* thread could start.
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
|
|
||||||
/**
|
|
||||||
* This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
|
|
||||||
* writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
|
|
||||||
* execute,and in memory compact thread would exit,because we expect that in memory compact
|
|
||||||
* executing only once.
|
|
||||||
*/
|
|
||||||
private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
|
|
||||||
|
|
||||||
public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
|
|
||||||
HStore store, RegionServicesForStores regionServices,
|
|
||||||
MemoryCompactionPolicy compactionPolicy) throws IOException {
|
|
||||||
super(conf, cellComparator, store, regionServices, compactionPolicy);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VersionedSegmentsList getImmutableSegments() {
|
|
||||||
VersionedSegmentsList result = super.getImmutableSegments();
|
|
||||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
|
||||||
int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
try {
|
|
||||||
flattenOneSegmentPreCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
|
|
||||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
|
||||||
int currentCount = swapPipelineWithNullCounter.incrementAndGet();
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
try {
|
|
||||||
flattenOneSegmentPostCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (currentCount == 2) {
|
|
||||||
try {
|
|
||||||
/**
|
|
||||||
* Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
|
|
||||||
* writeAgain thread could start.
|
|
||||||
*/
|
|
||||||
writeMemStoreAgainStartCyclicBarrier.await();
|
|
||||||
/**
|
|
||||||
* Only the writeAgain thread completes, retry
|
|
||||||
* {@link CompactingMemStore#swapPipelineWithNull} would execute.
|
|
||||||
*/
|
|
||||||
writeMemStoreAgainEndCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
boolean result = super.swapPipelineWithNull(segments);
|
|
||||||
if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
|
|
||||||
int currentCount = swapPipelineWithNullCounter.get();
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
assertTrue(!result);
|
|
||||||
}
|
|
||||||
if (currentCount == 2) {
|
|
||||||
assertTrue(result);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void flattenOneSegment(long requesterVersion, Action action) {
|
|
||||||
int currentCount = flattenOneSegmentCounter.incrementAndGet();
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
try {
|
|
||||||
/**
|
|
||||||
* {@link CompactingMemStore#snapshot} could start.
|
|
||||||
*/
|
|
||||||
snapShotStartCyclicCyclicBarrier.await();
|
|
||||||
flattenOneSegmentPreCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
super.flattenOneSegment(requesterVersion, action);
|
|
||||||
if (currentCount <= 1) {
|
|
||||||
try {
|
|
||||||
flattenOneSegmentPostCyclicBarrier.await();
|
|
||||||
/**
|
|
||||||
* Only the writeAgain thread completes, in memory compact thread would exit,because we
|
|
||||||
* expect that in memory compact executing only once.
|
|
||||||
*/
|
|
||||||
writeMemStoreAgainEndCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean setInMemoryCompactionFlag() {
|
|
||||||
boolean result = super.setInMemoryCompactionFlag();
|
|
||||||
int count = setInMemoryCompactionFlagCounter.incrementAndGet();
|
|
||||||
if (count <= 1) {
|
|
||||||
assertTrue(result);
|
|
||||||
}
|
|
||||||
if (count == 2) {
|
|
||||||
assertTrue(!result);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void inMemoryCompaction() {
|
|
||||||
try {
|
|
||||||
super.inMemoryCompaction();
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
inMemoryCompactionEndCyclicBarrier.await();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue