HBASE-17434: New synchronization scheme for compaction pipeline
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
f7d0f15c99
commit
2f8ddf6fc5
|
@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
@VisibleForTesting
|
||||
@Override
|
||||
protected List<Segment> getSegments() {
|
||||
List<Segment> pipelineList = pipeline.getSegments();
|
||||
List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
|
||||
List<? extends Segment> pipelineList = pipeline.getSegments();
|
||||
List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
|
||||
list.add(this.active);
|
||||
list.addAll(pipelineList);
|
||||
list.add(this.snapshot);
|
||||
|
@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
* Scanners are ordered from 0 (oldest) to newest in increasing order.
|
||||
*/
|
||||
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
|
||||
List<Segment> pipelineList = pipeline.getSegments();
|
||||
List<? extends Segment> pipelineList = pipeline.getSegments();
|
||||
long order = pipelineList.size();
|
||||
// The list of elements in pipeline + the active element + the snapshot segment
|
||||
// TODO : This will change when the snapshot is made of more than one element
|
||||
|
|
|
@ -25,50 +25,65 @@ import java.util.List;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
||||
/**
|
||||
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
|
||||
* It supports pushing a segment at the head of the pipeline and pulling a segment from the
|
||||
* tail to flush to disk.
|
||||
* It also supports swap operation to allow the compactor swap a subset of the segments with a new
|
||||
* (compacted) one. This swap succeeds only if the version number passed with the list of segments
|
||||
* to swap is the same as the current version of the pipeline.
|
||||
* The pipeline version is updated whenever swapping segments or pulling the segment at the tail.
|
||||
* It supports pushing a segment at the head of the pipeline and removing a segment from the
|
||||
* tail when it is flushed to disk.
|
||||
* It also supports swap method to allow the in-memory compaction swap a subset of the segments
|
||||
* at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version
|
||||
* number passed with the list of segments to swap is the same as the current version of the
|
||||
* pipeline.
|
||||
* Essentially, there are two methods which can change the structure of the pipeline: pushHead()
|
||||
* and swap(), the later is used both by a flush to disk and by an in-memory compaction.
|
||||
* The pipeline version is updated by swap(); it allows to identify conflicting operations at the
|
||||
* suffix of the pipeline.
|
||||
*
|
||||
* The synchronization model is copy-on-write. Methods which change the structure of the
|
||||
* pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make
|
||||
* a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read
|
||||
* method accesses the read-only copy more than once it makes a local copy of it
|
||||
* to ensure it accesses the same copy.
|
||||
*
|
||||
* The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also
|
||||
* protected by a lock since they need to have a consistent (atomic) view of the pipeline list
|
||||
* and version number.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CompactionPipeline {
|
||||
private static final Log LOG = LogFactory.getLog(CompactionPipeline.class);
|
||||
|
||||
public final static long FIXED_OVERHEAD = ClassSize
|
||||
.align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
|
||||
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST;
|
||||
.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
|
||||
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
|
||||
|
||||
private final RegionServicesForStores region;
|
||||
private LinkedList<ImmutableSegment> pipeline;
|
||||
private long version;
|
||||
private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
|
||||
// The list is volatile to avoid reading a new allocated reference before the c'tor is executed
|
||||
private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
|
||||
// Version is volatile to ensure it is atomically read when not using a lock
|
||||
private volatile long version = 0;
|
||||
|
||||
public CompactionPipeline(RegionServicesForStores region) {
|
||||
this.region = region;
|
||||
this.pipeline = new LinkedList<>();
|
||||
this.version = 0;
|
||||
}
|
||||
|
||||
public boolean pushHead(MutableSegment segment) {
|
||||
ImmutableSegment immutableSegment = SegmentFactory.instance().
|
||||
createImmutableSegment(segment);
|
||||
synchronized (pipeline){
|
||||
return addFirst(immutableSegment);
|
||||
boolean res = addFirst(immutableSegment);
|
||||
readOnlyCopy = new LinkedList<>(pipeline);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
public VersionedSegmentsList getVersionedList() {
|
||||
synchronized (pipeline){
|
||||
List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
|
||||
return new VersionedSegmentsList(segmentList, version);
|
||||
return new VersionedSegmentsList(readOnlyCopy, version);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,8 +108,10 @@ public class CompactionPipeline {
|
|||
* During index merge op this will be false and for compaction it will be true.
|
||||
* @return true iff swapped tail with new segment
|
||||
*/
|
||||
public boolean swap(
|
||||
VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) {
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
|
||||
justification="Increment is done under a synchronize block so safe")
|
||||
public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
|
||||
boolean closeSuffix) {
|
||||
if (versionedList.getVersion() != version) {
|
||||
return false;
|
||||
}
|
||||
|
@ -115,6 +132,8 @@ public class CompactionPipeline {
|
|||
+ ", and the number of cells in new segment is:" + count);
|
||||
}
|
||||
swapSuffix(suffix, segment, closeSuffix);
|
||||
readOnlyCopy = new LinkedList<>(pipeline);
|
||||
version++;
|
||||
}
|
||||
if (closeSuffix && region != null) {
|
||||
// update the global memstore size counter
|
||||
|
@ -193,35 +212,34 @@ public class CompactionPipeline {
|
|||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return pipeline.isEmpty();
|
||||
return readOnlyCopy.isEmpty();
|
||||
}
|
||||
|
||||
public List<Segment> getSegments() {
|
||||
synchronized (pipeline){
|
||||
return new LinkedList<>(pipeline);
|
||||
}
|
||||
public List<? extends Segment> getSegments() {
|
||||
return readOnlyCopy;
|
||||
}
|
||||
|
||||
public long size() {
|
||||
return pipeline.size();
|
||||
return readOnlyCopy.size();
|
||||
}
|
||||
|
||||
public long getMinSequenceId() {
|
||||
long minSequenceId = Long.MAX_VALUE;
|
||||
if (!isEmpty()) {
|
||||
minSequenceId = pipeline.getLast().getMinSequenceId();
|
||||
LinkedList<? extends Segment> localCopy = readOnlyCopy;
|
||||
if (!localCopy.isEmpty()) {
|
||||
minSequenceId = localCopy.getLast().getMinSequenceId();
|
||||
}
|
||||
return minSequenceId;
|
||||
}
|
||||
|
||||
public MemstoreSize getTailSize() {
|
||||
if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
|
||||
return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
|
||||
LinkedList<? extends Segment> localCopy = readOnlyCopy;
|
||||
if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE;
|
||||
return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead());
|
||||
}
|
||||
|
||||
private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,
|
||||
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
|
||||
boolean closeSegmentsInSuffix) {
|
||||
version++;
|
||||
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
|
||||
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
|
||||
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
|
||||
|
|
|
@ -325,6 +325,7 @@ public class TestHeapSize {
|
|||
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
|
||||
expected += ClassSize.estimateBase(CompactionPipeline.class, false);
|
||||
expected += ClassSize.estimateBase(LinkedList.class, false);
|
||||
expected += ClassSize.estimateBase(LinkedList.class, false);
|
||||
expected += ClassSize.estimateBase(MemStoreCompactor.class, false);
|
||||
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
|
||||
if (expected != actual) {
|
||||
|
@ -333,6 +334,7 @@ public class TestHeapSize {
|
|||
ClassSize.estimateBase(AtomicBoolean.class, true);
|
||||
ClassSize.estimateBase(CompactionPipeline.class, true);
|
||||
ClassSize.estimateBase(LinkedList.class, true);
|
||||
ClassSize.estimateBase(LinkedList.class, true);
|
||||
ClassSize.estimateBase(MemStoreCompactor.class, true);
|
||||
ClassSize.estimateBase(AtomicBoolean.class, true);
|
||||
assertEquals(expected, actual);
|
||||
|
|
Loading…
Reference in New Issue