HBASE-17081 Flush the entire CompactingMemStore content to disk - recommit
(Anastasia)
This commit is contained in:
parent
805d39fca6
commit
b779143fdc
@ -72,6 +72,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
|
||||
@VisibleForTesting
|
||||
private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
|
||||
private boolean compositeSnapshot = true;
|
||||
|
||||
public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
|
||||
+ 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
|
||||
@ -160,7 +161,12 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||
stopCompaction();
|
||||
pushActiveToPipeline(this.active);
|
||||
snapshotId = EnvironmentEdgeManager.currentTime();
|
||||
pushTailToSnapshot();
|
||||
// in both cases whatever is pushed to snapshot is cleared from the pipeline
|
||||
if (compositeSnapshot) {
|
||||
pushPipelineToSnapshot();
|
||||
} else {
|
||||
pushTailToSnapshot();
|
||||
}
|
||||
}
|
||||
return new MemStoreSnapshot(snapshotId, this.snapshot);
|
||||
}
|
||||
@ -173,8 +179,13 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||
public MemstoreSize getFlushableSize() {
|
||||
MemstoreSize snapshotSize = getSnapshotSize();
|
||||
if (snapshotSize.getDataSize() == 0) {
|
||||
// if snapshot is empty the tail of the pipeline is flushed
|
||||
snapshotSize = pipeline.getTailSize();
|
||||
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
|
||||
if (compositeSnapshot) {
|
||||
snapshotSize = pipeline.getPipelineSize();
|
||||
snapshotSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead());
|
||||
} else {
|
||||
snapshotSize = pipeline.getTailSize();
|
||||
}
|
||||
}
|
||||
return snapshotSize.getDataSize() > 0 ? snapshotSize
|
||||
: new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
|
||||
@ -221,10 +232,20 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||
List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
|
||||
list.add(this.active);
|
||||
list.addAll(pipelineList);
|
||||
list.add(this.snapshot);
|
||||
list.addAll(this.snapshot.getAllSegments());
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
// the following three methods allow to manipulate the settings of composite snapshot
|
||||
public void setCompositeSnapshot(boolean useCompositeSnapshot) {
|
||||
this.compositeSnapshot = useCompositeSnapshot;
|
||||
}
|
||||
|
||||
public boolean isCompositeSnapshot() {
|
||||
return this.compositeSnapshot;
|
||||
}
|
||||
|
||||
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
|
||||
boolean merge) {
|
||||
return pipeline.swap(versionedList, result, !merge);
|
||||
@ -265,17 +286,20 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||
*/
|
||||
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
|
||||
List<? extends Segment> pipelineList = pipeline.getSegments();
|
||||
long order = pipelineList.size();
|
||||
int order = pipelineList.size() + snapshot.getNumOfSegments();
|
||||
// The list of elements in pipeline + the active element + the snapshot segment
|
||||
// TODO : This will change when the snapshot is made of more than one element
|
||||
// The order is the Segment ordinal
|
||||
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
|
||||
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(order+1);
|
||||
list.add(this.active.getScanner(readPt, order + 1));
|
||||
for (Segment item : pipelineList) {
|
||||
list.add(item.getScanner(readPt, order));
|
||||
order--;
|
||||
}
|
||||
list.add(this.snapshot.getScanner(readPt, order));
|
||||
for (Segment item : snapshot.getAllSegments()) {
|
||||
list.add(item.getScanner(readPt, order));
|
||||
order--;
|
||||
}
|
||||
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
|
||||
}
|
||||
|
||||
@ -382,13 +406,37 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||
pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
|
||||
}
|
||||
|
||||
private void pushPipelineToSnapshot() {
|
||||
int iterationsCnt = 0;
|
||||
boolean done = false;
|
||||
while (!done) {
|
||||
iterationsCnt++;
|
||||
VersionedSegmentsList segments = pipeline.getVersionedList();
|
||||
pushToSnapshot(segments.getStoreSegments());
|
||||
// swap can return false in case the pipeline was updated by ongoing compaction
|
||||
// and the version increase, the chance of it happenning is very low
|
||||
done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now
|
||||
if (iterationsCnt>2) {
|
||||
// practically it is impossible that this loop iterates more than two times
|
||||
// (because the compaction is stopped and none restarts it while in snapshot request),
|
||||
// however stopping here for the case of the infinite loop causing by any error
|
||||
LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," +
|
||||
" while flushing to disk.");
|
||||
this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void pushToSnapshot(List<ImmutableSegment> segments) {
|
||||
if(segments.isEmpty()) return;
|
||||
if(segments.size() == 1 && !segments.get(0).isEmpty()) {
|
||||
this.snapshot = segments.get(0);
|
||||
return;
|
||||
} else { // create composite snapshot
|
||||
this.snapshot =
|
||||
SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments);
|
||||
}
|
||||
// TODO else craete composite snapshot
|
||||
}
|
||||
|
||||
private RegionServicesForStores getRegionServices() {
|
||||
|
@ -238,6 +238,18 @@ public class CompactionPipeline {
|
||||
return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead());
|
||||
}
|
||||
|
||||
public MemstoreSize getPipelineSize() {
|
||||
long keySize = 0;
|
||||
long heapOverhead = 0;
|
||||
LinkedList<? extends Segment> localCopy = readOnlyCopy;
|
||||
if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE;
|
||||
for (Segment segment : localCopy) {
|
||||
keySize += segment.keySize();
|
||||
heapOverhead += segment.heapOverhead();
|
||||
}
|
||||
return new MemstoreSize(keySize, heapOverhead);
|
||||
}
|
||||
|
||||
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
|
||||
boolean closeSegmentsInSuffix) {
|
||||
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
|
||||
|
@ -0,0 +1,306 @@
|
||||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
|
||||
/**
|
||||
* The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports
|
||||
* the interface of a single ImmutableSegments.
|
||||
* The CompositeImmutableSegments is planned to be used only as a snapshot,
|
||||
* thus only relevant interfaces are supported
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CompositeImmutableSegment extends ImmutableSegment {
|
||||
|
||||
private final List<ImmutableSegment> segments;
|
||||
private final CellComparator comparator;
|
||||
// CompositeImmutableSegment is used for snapshots and snapshot should
|
||||
// support getTimeRangeTracker() interface.
|
||||
// Thus we hold a constant TRT build in the construction time from TRT of the given segments.
|
||||
private final TimeRangeTracker timeRangeTracker;
|
||||
|
||||
private long keySize = 0;
|
||||
|
||||
public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
|
||||
super(comparator);
|
||||
this.comparator = comparator;
|
||||
this.segments = segments;
|
||||
this.timeRangeTracker = new TimeRangeTracker();
|
||||
for (ImmutableSegment s : segments) {
|
||||
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
|
||||
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
|
||||
this.keySize += s.keySize();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public List<Segment> getAllSegments() {
|
||||
return new LinkedList<Segment>(segments);
|
||||
}
|
||||
|
||||
public int getNumOfSegments() {
|
||||
return segments.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a special scanner for the MemStoreSnapshot object that is different than the
|
||||
* general segment scanner.
|
||||
* @return a special scanner for the MemStoreSnapshot object
|
||||
*/
|
||||
public KeyValueScanner getSnapshotScanner() {
|
||||
return getScanner(Long.MAX_VALUE, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether the segment has any cells
|
||||
*/
|
||||
public boolean isEmpty() {
|
||||
for (ImmutableSegment s : segments) {
|
||||
if (!s.isEmpty()) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of cells in segment
|
||||
*/
|
||||
public int getCellsCount() {
|
||||
int result = 0;
|
||||
for (ImmutableSegment s : segments) {
|
||||
result += s.getCellsCount();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the first cell in the segment that has equal or greater key than the given cell
|
||||
*/
|
||||
public Cell getFirstAfter(Cell cell) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
/**
|
||||
* Closing a segment before it is being discarded
|
||||
*/
|
||||
public void close() {
|
||||
for (ImmutableSegment s : segments) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the segment has a memory allocator the cell is being cloned to this space, and returned;
|
||||
* otherwise the given cell is returned
|
||||
* @return either the given cell or its clone
|
||||
*/
|
||||
public Cell maybeCloneWithAllocator(Cell cell) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS){
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
public long getMinTimestamp(){
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the scanner for the given read point
|
||||
* @return a scanner for the given read point
|
||||
*/
|
||||
public KeyValueScanner getScanner(long readPoint) {
|
||||
// Long.MAX_VALUE is DEFAULT_SCANNER_ORDER
|
||||
return getScanner(readPoint,Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the scanner for the given read point, and a specific order in a list
|
||||
* @return a scanner for the given read point
|
||||
*/
|
||||
public KeyValueScanner getScanner(long readPoint, long order) {
|
||||
KeyValueScanner resultScanner;
|
||||
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
|
||||
for (ImmutableSegment s : segments) {
|
||||
list.add(s.getScanner(readPoint, order));
|
||||
}
|
||||
|
||||
try {
|
||||
resultScanner = new MemStoreScanner(getComparator(), list);
|
||||
} catch (IOException ie) {
|
||||
throw new IllegalStateException(ie);
|
||||
}
|
||||
|
||||
return resultScanner;
|
||||
}
|
||||
|
||||
public boolean isTagsPresent() {
|
||||
for (ImmutableSegment s : segments) {
|
||||
if (s.isTagsPresent()) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void incScannerCount() {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
public void decScannerCount() {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
/**
|
||||
* Setting the CellSet of the segment - used only for flat immutable segment for setting
|
||||
* immutable CellSet after its creation in immutable segment constructor
|
||||
* @return this object
|
||||
*/
|
||||
|
||||
protected CompositeImmutableSegment setCellSet(CellSet cellSetOld, CellSet cellSetNew) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Sum of all cell sizes.
|
||||
*/
|
||||
public long keySize() {
|
||||
return this.keySize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The heap overhead of this segment.
|
||||
*/
|
||||
public long heapOverhead() {
|
||||
long result = 0;
|
||||
for (ImmutableSegment s : segments) {
|
||||
result += s.heapOverhead();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the heap size counter of the segment by the given delta
|
||||
*/
|
||||
protected void incSize(long delta, long heapOverhead) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
protected void incHeapOverheadSize(long delta) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
public long getMinSequenceId() {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
public TimeRangeTracker getTimeRangeTracker() {
|
||||
return this.timeRangeTracker;
|
||||
}
|
||||
|
||||
//*** Methods for SegmentsScanner
|
||||
public Cell last() {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
public Iterator<Cell> iterator() {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
public int compare(Cell left, Cell right) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
public int compareRows(Cell left, Cell right) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a set of all cells in the segment
|
||||
*/
|
||||
protected CellSet getCellSet() {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Cell comparator used by this segment
|
||||
* @return the Cell comparator used by this segment
|
||||
*/
|
||||
protected CellComparator getComparator() {
|
||||
return comparator;
|
||||
}
|
||||
|
||||
protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
|
||||
MemstoreSize memstoreSize) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
protected long heapOverheadChange(Cell cell, boolean succ) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a subset of the segment cell set, which starts with the given cell
|
||||
* @param firstCell a cell in the segment
|
||||
* @return a subset of the segment cell set, which starts with the given cell
|
||||
*/
|
||||
protected SortedSet<Cell> tailSet(Cell firstCell) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
// Debug methods
|
||||
/**
|
||||
* Dumps all cells of the segment into the given log
|
||||
*/
|
||||
void dump(Log log) {
|
||||
for (ImmutableSegment s : segments) {
|
||||
s.dump(log);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb =
|
||||
new StringBuilder("This is CompositeImmutableSegment and those are its segments:: ");
|
||||
for (ImmutableSegment s : segments) {
|
||||
sb.append(s.toString());
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
@ -29,6 +29,9 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
|
||||
@ -67,6 +70,14 @@ public class ImmutableSegment extends Segment {
|
||||
}
|
||||
|
||||
///////////////////// CONSTRUCTORS /////////////////////
|
||||
/**------------------------------------------------------------------------
|
||||
* Empty C-tor to be used only for CompositeImmutableSegment
|
||||
*/
|
||||
protected ImmutableSegment(CellComparator comparator) {
|
||||
super(comparator);
|
||||
this.timeRange = null;
|
||||
}
|
||||
|
||||
/**------------------------------------------------------------------------
|
||||
* Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one.
|
||||
* This C-tor should be used when active MutableSegment is pushed into the compaction
|
||||
@ -141,6 +152,15 @@ public class ImmutableSegment extends Segment {
|
||||
return this.timeRange.getMin();
|
||||
}
|
||||
|
||||
public int getNumOfSegments() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
public List<Segment> getAllSegments() {
|
||||
List<Segment> res = new ArrayList<Segment>(Arrays.asList(this));
|
||||
return res;
|
||||
}
|
||||
|
||||
/**------------------------------------------------------------------------
|
||||
* Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
|
||||
* based on CellArrayMap.
|
||||
@ -231,7 +251,7 @@ public class ImmutableSegment extends Segment {
|
||||
Cell curCell;
|
||||
int idx = 0;
|
||||
// create this segment scanner with maximal possible read point, to go over all Cells
|
||||
SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
|
||||
KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
|
||||
|
||||
try {
|
||||
while ((curCell = segmentScanner.next()) != null) {
|
||||
|
@ -56,7 +56,7 @@ public class MemStoreCompactor {
|
||||
|
||||
// The upper bound for the number of segments we store in the pipeline prior to merging.
|
||||
// This constant is subject to further experimentation.
|
||||
private static final int THRESHOLD_PIPELINE_SEGMENTS = 1;
|
||||
private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
|
||||
|
||||
|
@ -25,19 +25,32 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
@InterfaceAudience.Private
|
||||
public class MemstoreSize {
|
||||
|
||||
static final MemstoreSize EMPTY_SIZE = new MemstoreSize();
|
||||
|
||||
private long dataSize;
|
||||
private long heapOverhead;
|
||||
final private boolean isEmpty;
|
||||
|
||||
static final MemstoreSize EMPTY_SIZE = new MemstoreSize(true);
|
||||
|
||||
public MemstoreSize() {
|
||||
dataSize = 0;
|
||||
heapOverhead = 0;
|
||||
isEmpty = false;
|
||||
}
|
||||
|
||||
public MemstoreSize(boolean isEmpty) {
|
||||
dataSize = 0;
|
||||
heapOverhead = 0;
|
||||
this.isEmpty = isEmpty;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return isEmpty;
|
||||
}
|
||||
|
||||
public MemstoreSize(long dataSize, long heapOverhead) {
|
||||
this.dataSize = dataSize;
|
||||
this.heapOverhead = heapOverhead;
|
||||
this.isEmpty = false;
|
||||
}
|
||||
|
||||
public void incMemstoreSize(long dataSize, long heapOverhead) {
|
||||
@ -61,11 +74,13 @@ public class MemstoreSize {
|
||||
}
|
||||
|
||||
public long getDataSize() {
|
||||
return dataSize;
|
||||
|
||||
return isEmpty ? 0 : dataSize;
|
||||
}
|
||||
|
||||
public long getHeapOverhead() {
|
||||
return heapOverhead;
|
||||
|
||||
return isEmpty ? 0 : heapOverhead;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -74,7 +89,7 @@ public class MemstoreSize {
|
||||
return false;
|
||||
}
|
||||
MemstoreSize other = (MemstoreSize) obj;
|
||||
return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead;
|
||||
return getDataSize() == other.dataSize && getHeapOverhead() == other.heapOverhead;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -18,7 +18,9 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@ -64,6 +66,15 @@ public abstract class Segment {
|
||||
protected final TimeRangeTracker timeRangeTracker;
|
||||
protected volatile boolean tagsPresent;
|
||||
|
||||
// Empty constructor to be used when Segment is used as interface,
|
||||
// and there is no need in true Segments state
|
||||
protected Segment(CellComparator comparator) {
|
||||
this.comparator = comparator;
|
||||
this.dataSize = new AtomicLong(0);
|
||||
this.heapOverhead = new AtomicLong(0);
|
||||
this.timeRangeTracker = new TimeRangeTracker();
|
||||
}
|
||||
|
||||
// This constructor is used to create empty Segments.
|
||||
protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
|
||||
this.cellSet.set(cellSet);
|
||||
@ -91,7 +102,7 @@ public abstract class Segment {
|
||||
* Creates the scanner for the given read point
|
||||
* @return a scanner for the given read point
|
||||
*/
|
||||
public SegmentScanner getScanner(long readPoint) {
|
||||
public KeyValueScanner getScanner(long readPoint) {
|
||||
return new SegmentScanner(this, readPoint);
|
||||
}
|
||||
|
||||
@ -99,10 +110,16 @@ public abstract class Segment {
|
||||
* Creates the scanner for the given read point, and a specific order in a list
|
||||
* @return a scanner for the given read point
|
||||
*/
|
||||
public SegmentScanner getScanner(long readPoint, long order) {
|
||||
public KeyValueScanner getScanner(long readPoint, long order) {
|
||||
return new SegmentScanner(this, readPoint, order);
|
||||
}
|
||||
|
||||
public List<KeyValueScanner> getScanners(long readPoint, long order) {
|
||||
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
|
||||
scanners.add(getScanner(readPoint, order));
|
||||
return scanners;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether the segment has any cells
|
||||
*/
|
||||
|
@ -47,6 +47,13 @@ public final class SegmentFactory {
|
||||
return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
|
||||
}
|
||||
|
||||
// create composite immutable segment from a list of segments
|
||||
public CompositeImmutableSegment createCompositeImmutableSegment(
|
||||
final CellComparator comparator, List<ImmutableSegment> segments) {
|
||||
return new CompositeImmutableSegment(comparator, segments);
|
||||
|
||||
}
|
||||
|
||||
// create new flat immutable segment from compacting old immutable segments
|
||||
public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
|
||||
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
|
||||
@ -102,6 +109,9 @@ public final class SegmentFactory {
|
||||
|
||||
private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {
|
||||
List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
|
||||
if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
|
||||
return null;
|
||||
}
|
||||
for (ImmutableSegment segment : segments) {
|
||||
mslabs.add(segment.getMemStoreLAB());
|
||||
}
|
||||
|
@ -622,10 +622,9 @@ public class TestWalAndCompactingMemStoreFlush {
|
||||
// Set up the configuration
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
|
||||
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class
|
||||
.getName());
|
||||
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 *
|
||||
1024);
|
||||
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
|
||||
FlushNonSloppyStoresFirstPolicy.class.getName());
|
||||
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
|
||||
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
|
||||
// set memstore to do data compaction and not to use the speculative scan
|
||||
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
|
||||
@ -648,6 +647,10 @@ public class TestWalAndCompactingMemStoreFlush {
|
||||
region.put(createPut(2, i));
|
||||
}
|
||||
|
||||
// in this test check the non-composite snapshot - flashing only tail of the pipeline
|
||||
((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
|
||||
((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
|
||||
|
||||
long totalMemstoreSize = region.getMemstoreSize();
|
||||
|
||||
// Find the sizes of the memstores of each CF.
|
||||
|
Loading…
x
Reference in New Issue
Block a user