diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e1289f8c180..99c1685bc1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore { @VisibleForTesting @Override protected List getSegments() { - List pipelineList = pipeline.getSegments(); - List list = new ArrayList(pipelineList.size() + 2); + List pipelineList = pipeline.getSegments(); + List list = new ArrayList<>(pipelineList.size() + 2); list.add(this.active); list.addAll(pipelineList); list.add(this.snapshot); @@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List getScanners(long readPt) throws IOException { - List pipelineList = pipeline.getSegments(); + List pipelineList = pipeline.getSegments(); long order = pipelineList.size(); // The list of elements in pipeline + the active element + the snapshot segment // TODO : This will change when the snapshot is made of more than one element diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 9d5df779d42..fafdbee5193 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -25,50 +25,65 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; /** * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. - * It supports pushing a segment at the head of the pipeline and pulling a segment from the - * tail to flush to disk. - * It also supports swap operation to allow the compactor swap a subset of the segments with a new - * (compacted) one. This swap succeeds only if the version number passed with the list of segments - * to swap is the same as the current version of the pipeline. - * The pipeline version is updated whenever swapping segments or pulling the segment at the tail. + * It supports pushing a segment at the head of the pipeline and removing a segment from the + * tail when it is flushed to disk. + * It also supports swap method to allow the in-memory compaction swap a subset of the segments + * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version + * number passed with the list of segments to swap is the same as the current version of the + * pipeline. + * Essentially, there are two methods which can change the structure of the pipeline: pushHead() + * and swap(), the later is used both by a flush to disk and by an in-memory compaction. + * The pipeline version is updated by swap(); it allows to identify conflicting operations at the + * suffix of the pipeline. + * + * The synchronization model is copy-on-write. Methods which change the structure of the + * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make + * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read + * method accesses the read-only copy more than once it makes a local copy of it + * to ensure it accesses the same copy. + * + * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also + * protected by a lock since they need to have a consistent (atomic) view of the pipeline list + * and version number. */ @InterfaceAudience.Private public class CompactionPipeline { private static final Log LOG = LogFactory.getLog(CompactionPipeline.class); public final static long FIXED_OVERHEAD = ClassSize - .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); - public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST; + .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST); private final RegionServicesForStores region; - private LinkedList pipeline; - private long version; + private final LinkedList pipeline = new LinkedList<>(); + // The list is volatile to avoid reading a new allocated reference before the c'tor is executed + private volatile LinkedList readOnlyCopy = new LinkedList<>(); + // Version is volatile to ensure it is atomically read when not using a lock + private volatile long version = 0; public CompactionPipeline(RegionServicesForStores region) { this.region = region; - this.pipeline = new LinkedList<>(); - this.version = 0; } public boolean pushHead(MutableSegment segment) { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(segment); synchronized (pipeline){ - return addFirst(immutableSegment); + boolean res = addFirst(immutableSegment); + readOnlyCopy = new LinkedList<>(pipeline); + return res; } } public VersionedSegmentsList getVersionedList() { synchronized (pipeline){ - List segmentList = new ArrayList<>(pipeline); - return new VersionedSegmentsList(segmentList, version); + return new VersionedSegmentsList(readOnlyCopy, version); } } @@ -93,8 +108,10 @@ public class CompactionPipeline { * During index merge op this will be false and for compaction it will be true. * @return true iff swapped tail with new segment */ - public boolean swap( - VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", + justification="Increment is done under a synchronize block so safe") + public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, + boolean closeSuffix) { if (versionedList.getVersion() != version) { return false; } @@ -115,6 +132,8 @@ public class CompactionPipeline { + ", and the number of cells in new segment is:" + count); } swapSuffix(suffix, segment, closeSuffix); + readOnlyCopy = new LinkedList<>(pipeline); + version++; } if (closeSuffix && region != null) { // update the global memstore size counter @@ -193,35 +212,34 @@ public class CompactionPipeline { } public boolean isEmpty() { - return pipeline.isEmpty(); + return readOnlyCopy.isEmpty(); } - public List getSegments() { - synchronized (pipeline){ - return new LinkedList<>(pipeline); - } + public List getSegments() { + return readOnlyCopy; } public long size() { - return pipeline.size(); + return readOnlyCopy.size(); } public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; - if (!isEmpty()) { - minSequenceId = pipeline.getLast().getMinSequenceId(); + LinkedList localCopy = readOnlyCopy; + if (!localCopy.isEmpty()) { + minSequenceId = localCopy.getLast().getMinSequenceId(); } return minSequenceId; } public MemstoreSize getTailSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + LinkedList localCopy = readOnlyCopy; + if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead()); } - private void swapSuffix(List suffix, ImmutableSegment segment, + private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { - version++; // During index merge we won't be closing the segments undergoing the merge. Segment#close() // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 6e8f831e744..ceaadbe3962 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -325,6 +325,7 @@ public class TestHeapSize { expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(CompactionPipeline.class, false); expected += ClassSize.estimateBase(LinkedList.class, false); + expected += ClassSize.estimateBase(LinkedList.class, false); expected += ClassSize.estimateBase(MemStoreCompactor.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false); if (expected != actual) { @@ -333,6 +334,7 @@ public class TestHeapSize { ClassSize.estimateBase(AtomicBoolean.class, true); ClassSize.estimateBase(CompactionPipeline.class, true); ClassSize.estimateBase(LinkedList.class, true); + ClassSize.estimateBase(LinkedList.class, true); ClassSize.estimateBase(MemStoreCompactor.class, true); ClassSize.estimateBase(AtomicBoolean.class, true); assertEquals(expected, actual);