From bd157ffe9a72cd723f238f1685608da573bb0df7 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Mon, 9 Jan 2017 15:40:01 -0800 Subject: [PATCH] Revert "HBASE-17434 New Synchronization Scheme for Compaction Pipeline (Eshcar Hillel)" Undo premature commit This reverts commit 1576269123f18c9eb21b04a800e81952ec52c04d. --- .../regionserver/CompactingMemStore.java | 6 +- .../regionserver/CompactionPipeline.java | 76 ++++++++----------- .../apache/hadoop/hbase/io/TestHeapSize.java | 2 - 3 files changed, 33 insertions(+), 51 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 99c1685bc1b..e1289f8c180 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore { @VisibleForTesting @Override protected List getSegments() { - List pipelineList = pipeline.getSegments(); - List list = new ArrayList<>(pipelineList.size() + 2); + List pipelineList = pipeline.getSegments(); + List list = new ArrayList(pipelineList.size() + 2); list.add(this.active); list.addAll(pipelineList); list.add(this.snapshot); @@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List getScanners(long readPt) throws IOException { - List pipelineList = pipeline.getSegments(); + List pipelineList = pipeline.getSegments(); long order = pipelineList.size(); // The list of elements in pipeline + the active element + the snapshot segment // TODO : This will change when the snapshot is made of more than one element diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index ebc8c4b8fe8..9d5df779d42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -25,63 +25,50 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; /** * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. - * It supports pushing a segment at the head of the pipeline and removing a segment from the - * tail when it is flushed to disk. - * It also supports swap method to allow the in-memory compaction swap a subset of the segments - * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version - * number passed with the list of segments to swap is the same as the current version of the - * pipeline. - * Essentially, there are two methods which can change the structure of the pipeline: pushHead() - * and swap(), the later is used both by a flush to disk and by an in-memory compaction. - * The pipeline version is updated by swap(); it allows to identify conflicting operations at the - * suffix of the pipeline. - * - * The synchronization model is copy-on-write. Methods which change the structure of the - * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make - * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read - * method accesses the read-only copy more than once it makes a local copy of it - * to ensure it accesses the same copy. - * - * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also - * protected by a lock since they need to have a consistent (atomic) view of the pipeline lsit - * and version number. + * It supports pushing a segment at the head of the pipeline and pulling a segment from the + * tail to flush to disk. + * It also supports swap operation to allow the compactor swap a subset of the segments with a new + * (compacted) one. This swap succeeds only if the version number passed with the list of segments + * to swap is the same as the current version of the pipeline. + * The pipeline version is updated whenever swapping segments or pulling the segment at the tail. */ @InterfaceAudience.Private public class CompactionPipeline { private static final Log LOG = LogFactory.getLog(CompactionPipeline.class); public final static long FIXED_OVERHEAD = ClassSize - .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); - public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST); + .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST; private final RegionServicesForStores region; - private LinkedList pipeline = new LinkedList<>(); - private LinkedList readOnlyCopy = new LinkedList<>(); - private volatile long version = 0; + private LinkedList pipeline; + private long version; public CompactionPipeline(RegionServicesForStores region) { this.region = region; + this.pipeline = new LinkedList<>(); + this.version = 0; } public boolean pushHead(MutableSegment segment) { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(segment); synchronized (pipeline){ - boolean res = addFirst(immutableSegment); - readOnlyCopy = new LinkedList<>(pipeline); - return res; + return addFirst(immutableSegment); } } public VersionedSegmentsList getVersionedList() { synchronized (pipeline){ - return new VersionedSegmentsList(readOnlyCopy, version); + List segmentList = new ArrayList<>(pipeline); + return new VersionedSegmentsList(segmentList, version); } } @@ -106,10 +93,8 @@ public class CompactionPipeline { * During index merge op this will be false and for compaction it will be true. * @return true iff swapped tail with new segment */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", - justification="Increment is done under a synchronize block so safe") - public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, - boolean closeSuffix) { + public boolean swap( + VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { if (versionedList.getVersion() != version) { return false; } @@ -130,8 +115,6 @@ public class CompactionPipeline { + ", and the number of cells in new segment is:" + count); } swapSuffix(suffix, segment, closeSuffix); - readOnlyCopy = new LinkedList<>(pipeline); - version++; } if (closeSuffix && region != null) { // update the global memstore size counter @@ -210,34 +193,35 @@ public class CompactionPipeline { } public boolean isEmpty() { - return readOnlyCopy.isEmpty(); + return pipeline.isEmpty(); } - public List getSegments() { - return readOnlyCopy; + public List getSegments() { + synchronized (pipeline){ + return new LinkedList<>(pipeline); + } } public long size() { - return readOnlyCopy.size(); + return pipeline.size(); } public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; - LinkedList localCopy = readOnlyCopy; - if (!localCopy.isEmpty()) { - minSequenceId = localCopy.getLast().getMinSequenceId(); + if (!isEmpty()) { + minSequenceId = pipeline.getLast().getMinSequenceId(); } return minSequenceId; } public MemstoreSize getTailSize() { - LinkedList localCopy = readOnlyCopy; - if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead()); + if (isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); } - private void swapSuffix(List suffix, ImmutableSegment segment, + private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { + version++; // During index merge we won't be closing the segments undergoing the merge. Segment#close() // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index ceaadbe3962..6e8f831e744 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -325,7 +325,6 @@ public class TestHeapSize { expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(CompactionPipeline.class, false); expected += ClassSize.estimateBase(LinkedList.class, false); - expected += ClassSize.estimateBase(LinkedList.class, false); expected += ClassSize.estimateBase(MemStoreCompactor.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false); if (expected != actual) { @@ -334,7 +333,6 @@ public class TestHeapSize { ClassSize.estimateBase(AtomicBoolean.class, true); ClassSize.estimateBase(CompactionPipeline.class, true); ClassSize.estimateBase(LinkedList.class, true); - ClassSize.estimateBase(LinkedList.class, true); ClassSize.estimateBase(MemStoreCompactor.class, true); ClassSize.estimateBase(AtomicBoolean.class, true); assertEquals(expected, actual);