HBASE-21738 Remove all the CLSM#size operation in our memstore because it's an quite time consuming.
This commit is contained in:
parent
a2f6768acd
commit
35df6147ee
|
@ -92,8 +92,8 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
// regionServices can be null when testing
|
||||
if (regionServices != null) {
|
||||
regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
|
||||
memstoreAccounting.getHeapSize(),
|
||||
memstoreAccounting.getOffHeapSize());
|
||||
memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
|
||||
memstoreAccounting.getCellsCount());
|
||||
}
|
||||
timeOfOldestEdit = Long.MAX_VALUE;
|
||||
}
|
||||
|
|
|
@ -40,9 +40,9 @@ public class CSLMImmutableSegment extends ImmutableSegment {
|
|||
super(segment);
|
||||
// update the segment metadata heap size
|
||||
long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM;
|
||||
incMemStoreSize(0, indexOverhead, 0); // CSLM is always on-heap
|
||||
incMemStoreSize(0, indexOverhead, 0, 0); // CSLM is always on-heap
|
||||
if (memstoreSizing != null) {
|
||||
memstoreSizing.incMemStoreSize(0, indexOverhead, 0);
|
||||
memstoreSizing.incMemStoreSize(0, indexOverhead, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
|
|||
protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
|
||||
MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
|
||||
super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
|
||||
incMemStoreSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap
|
||||
incMemStoreSize(0, DEEP_OVERHEAD_CAM, 0, 0); // CAM is always on-heap
|
||||
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
|
||||
initializeCellSet(numOfCells, iterator, action);
|
||||
}
|
||||
|
@ -59,17 +59,18 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
|
|||
MemStoreCompactionStrategy.Action action) {
|
||||
super(segment); // initiailize the upper class
|
||||
long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
|
||||
incMemStoreSize(0, indexOverhead, 0); // CAM is always on-heap
|
||||
mss.incMemStoreSize(0, indexOverhead, 0);
|
||||
incMemStoreSize(0, indexOverhead, 0, 0); // CAM is always on-heap
|
||||
mss.incMemStoreSize(0, indexOverhead, 0, 0);
|
||||
int numOfCells = segment.getCellsCount();
|
||||
// build the new CellSet based on CellChunkMap and update the CellSet of this Segment
|
||||
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
|
||||
action);
|
||||
action);
|
||||
// arrange the meta-data size, decrease all meta-data sizes related to SkipList;
|
||||
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
|
||||
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
|
||||
incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
mss.incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
long newSegmentSizeDelta =
|
||||
numOfCells * (indexEntrySize() - ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
|
||||
incMemStoreSize(0, newSegmentSizeDelta, 0, 0);
|
||||
mss.incMemStoreSize(0, newSegmentSizeDelta, 0, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -57,10 +57,10 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
// memStoreLAB cannot be null in this class
|
||||
boolean onHeap = getMemStoreLAB().isOnHeap();
|
||||
// initiate the heapSize with the size of the segment metadata
|
||||
if(onHeap) {
|
||||
incMemStoreSize(0, indexOverhead, 0);
|
||||
if (onHeap) {
|
||||
incMemStoreSize(0, indexOverhead, 0, 0);
|
||||
} else {
|
||||
incMemStoreSize(0, 0, indexOverhead);
|
||||
incMemStoreSize(0, 0, indexOverhead, 0);
|
||||
}
|
||||
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
|
||||
initializeCellSet(numOfCells, iterator, action);
|
||||
|
@ -79,12 +79,12 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
boolean onHeap = getMemStoreLAB().isOnHeap();
|
||||
// initiate the heapSize with the size of the segment metadata
|
||||
if(onHeap) {
|
||||
incMemStoreSize(0, indexOverhead, 0);
|
||||
memstoreSizing.incMemStoreSize(0, indexOverhead, 0);
|
||||
incMemStoreSize(0, indexOverhead, 0, 0);
|
||||
memstoreSizing.incMemStoreSize(0, indexOverhead, 0, 0);
|
||||
} else {
|
||||
incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM);
|
||||
memstoreSizing.incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM,
|
||||
DEEP_OVERHEAD_CCM);
|
||||
incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM, 0);
|
||||
memstoreSizing.incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM,
|
||||
0);
|
||||
}
|
||||
int numOfCells = segment.getCellsCount();
|
||||
// build the new CellSet based on CellChunkMap
|
||||
|
@ -95,11 +95,11 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
// (reinitializeCellSet doesn't take the care for the sizes)
|
||||
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
|
||||
if(onHeap) {
|
||||
incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
incMemStoreSize(0, newSegmentSizeDelta, 0, 0);
|
||||
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0, 0);
|
||||
} else {
|
||||
incMemStoreSize(0, 0, newSegmentSizeDelta);
|
||||
memstoreSizing.incMemStoreSize(0, 0, newSegmentSizeDelta);
|
||||
incMemStoreSize(0, 0, newSegmentSizeDelta, 0);
|
||||
memstoreSizing.incMemStoreSize(0, 0, newSegmentSizeDelta, 0);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -335,9 +335,9 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
long newCellSize = getCellLength(cell);
|
||||
long heapOverhead = newHeapSize - oldHeapSize;
|
||||
long offHeapOverhead = newOffHeapSize - oldOffHeapSize;
|
||||
incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
|
||||
incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead, 0);
|
||||
if(memstoreSizing != null) {
|
||||
memstoreSizing.incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
|
||||
memstoreSizing.incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead, 0);
|
||||
}
|
||||
return cell;
|
||||
}
|
||||
|
|
|
@ -217,6 +217,9 @@ public class CellSet implements NavigableSet<Cell> {
|
|||
|
||||
@Override
|
||||
public int size() {
|
||||
if (delegatee instanceof ConcurrentSkipListMap) {
|
||||
throw new UnsupportedOperationException("ConcurrentSkipListMap.size() is time-consuming");
|
||||
}
|
||||
return this.delegatee.size();
|
||||
}
|
||||
|
||||
|
|
|
@ -308,9 +308,10 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
* @param memstoreSizing object to accumulate region size changes
|
||||
* @return true iff can proceed with applying the update
|
||||
*/
|
||||
@Override protected boolean preUpdate(MutableSegment currentActive, Cell cell,
|
||||
@Override
|
||||
protected boolean preUpdate(MutableSegment currentActive, Cell cell,
|
||||
MemStoreSizing memstoreSizing) {
|
||||
if(currentActive.sharedLock()) {
|
||||
if (currentActive.sharedLock()) {
|
||||
if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) {
|
||||
return true;
|
||||
}
|
||||
|
@ -501,11 +502,11 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
|
||||
// when replaying edits from WAL there is no need in in-memory flush regardless the size
|
||||
// otherwise size below flush threshold try to update atomically
|
||||
if(currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
|
||||
if(memstoreSizing != null){
|
||||
memstoreSizing.incMemStoreSize(cellSize, 0, 0);
|
||||
if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
|
||||
if (memstoreSizing != null) {
|
||||
memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
|
||||
}
|
||||
//enough space for cell - no need to flush
|
||||
// enough space for cell - no need to flush
|
||||
return false;
|
||||
}
|
||||
segmentDataSize = currActive.getDataSize();
|
||||
|
|
|
@ -77,9 +77,8 @@ public class CompactionPipeline {
|
|||
ImmutableSegment immutableSegment = SegmentFactory.instance().
|
||||
createImmutableSegment(segment, memstoreAccounting);
|
||||
if (region != null) {
|
||||
region.addMemStoreSize(memstoreAccounting.getDataSize(),
|
||||
memstoreAccounting.getHeapSize(),
|
||||
memstoreAccounting.getOffHeapSize());
|
||||
region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(),
|
||||
memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount());
|
||||
}
|
||||
synchronized (pipeline){
|
||||
boolean res = addFirst(immutableSegment);
|
||||
|
@ -141,26 +140,30 @@ public class CompactionPipeline {
|
|||
if (updateRegionSize && region != null) {
|
||||
// update the global memstore size counter
|
||||
long suffixDataSize = getSegmentsKeySize(suffix);
|
||||
long newDataSize = 0;
|
||||
if(segment != null) {
|
||||
newDataSize = segment.getDataSize();
|
||||
}
|
||||
long dataSizeDelta = suffixDataSize - newDataSize;
|
||||
long suffixHeapSize = getSegmentsHeapSize(suffix);
|
||||
long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
|
||||
int suffixCellsCount = getSegmentsCellsCount(suffix);
|
||||
long newDataSize = 0;
|
||||
long newHeapSize = 0;
|
||||
long newOffHeapSize = 0;
|
||||
if(segment != null) {
|
||||
int newCellsCount = 0;
|
||||
if (segment != null) {
|
||||
newDataSize = segment.getDataSize();
|
||||
newHeapSize = segment.getHeapSize();
|
||||
newOffHeapSize = segment.getOffHeapSize();
|
||||
newCellsCount = segment.getCellsCount();
|
||||
}
|
||||
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
|
||||
long dataSizeDelta = suffixDataSize - newDataSize;
|
||||
long heapSizeDelta = suffixHeapSize - newHeapSize;
|
||||
region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
|
||||
LOG.debug("Suffix data size={}, new segment data size={}, " + "suffix heap size={},"
|
||||
+ "new segment heap size={}" + "suffix off heap size={},"
|
||||
+ "new segment off heap size={}", suffixDataSize, newDataSize, suffixHeapSize,
|
||||
newHeapSize, suffixOffHeapSize, newOffHeapSize);
|
||||
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
|
||||
int cellsCountDelta = suffixCellsCount - newCellsCount;
|
||||
region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta);
|
||||
LOG.debug(
|
||||
"Suffix data size={}, new segment data size={}, suffix heap size={},new segment heap "
|
||||
+ "size={} suffix off heap size={}, new segment off heap size={}, suffix cells "
|
||||
+ "count={}, new segment cells count={}",
|
||||
suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize,
|
||||
suffixCellsCount, newCellsCount);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -189,6 +192,14 @@ public class CompactionPipeline {
|
|||
return res;
|
||||
}
|
||||
|
||||
private static int getSegmentsCellsCount(List<? extends Segment> list) {
|
||||
int res = 0;
|
||||
for (Segment segment : list) {
|
||||
res += segment.getCellsCount();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the caller holds the current version, go over the the pipeline and try to flatten each
|
||||
* segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based.
|
||||
|
@ -221,11 +232,12 @@ public class CompactionPipeline {
|
|||
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
|
||||
(CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
|
||||
replaceAtIndex(i,newS);
|
||||
if(region != null) {
|
||||
if (region != null) {
|
||||
// Update the global memstore size counter upon flattening there is no change in the
|
||||
// data size
|
||||
MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
|
||||
region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
|
||||
mss.getCellsCount());
|
||||
}
|
||||
LOG.debug("Compaction pipeline segment {} flattened", s);
|
||||
return true;
|
||||
|
|
|
@ -190,7 +190,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
|
|||
* Updates the heap size counter of the segment by the given delta
|
||||
*/
|
||||
@Override
|
||||
public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) {
|
||||
public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead, int cellsCount) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
|
|
|
@ -104,8 +104,8 @@ public class DefaultMemStore extends AbstractMemStore {
|
|||
// regionServices can be null when testing
|
||||
if (regionServices != null) {
|
||||
regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
|
||||
memstoreAccounting.getHeapSize(),
|
||||
memstoreAccounting.getOffHeapSize());
|
||||
memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(),
|
||||
memstoreAccounting.getCellsCount());
|
||||
}
|
||||
this.snapshot = immutableSegment;
|
||||
resetActive();
|
||||
|
|
|
@ -1256,28 +1256,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* store
|
||||
*/
|
||||
void incMemStoreSize(MemStoreSize mss) {
|
||||
incMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
incMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
|
||||
mss.getCellsCount());
|
||||
}
|
||||
|
||||
void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
|
||||
int cellsCountDelta) {
|
||||
if (this.rsAccounting != null) {
|
||||
rsAccounting.incGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
}
|
||||
long dataSize =
|
||||
this.memStoreSizing.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
long dataSize = this.memStoreSizing.incMemStoreSize(dataSizeDelta, heapSizeDelta,
|
||||
offHeapSizeDelta, cellsCountDelta);
|
||||
checkNegativeMemStoreDataSize(dataSize, dataSizeDelta);
|
||||
}
|
||||
|
||||
void decrMemStoreSize(MemStoreSize mss) {
|
||||
decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
|
||||
mss.getCellsCount());
|
||||
}
|
||||
|
||||
void decrMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
void decrMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
|
||||
int cellsCountDelta) {
|
||||
if (this.rsAccounting != null) {
|
||||
rsAccounting.decGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
}
|
||||
long dataSize =
|
||||
this.memStoreSizing.decMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
long dataSize = this.memStoreSizing.decMemStoreSize(dataSizeDelta, heapSizeDelta,
|
||||
offHeapSizeDelta, cellsCountDelta);
|
||||
checkNegativeMemStoreDataSize(dataSize, -dataSizeDelta);
|
||||
}
|
||||
|
||||
|
@ -2801,7 +2805,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
// Set down the memstore size by amount of flush.
|
||||
MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
|
||||
this.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
this.decrMemStoreSize(mss);
|
||||
|
||||
// Increase the size of this Region for the purposes of quota. Noop if quotas are disabled.
|
||||
// During startup, quota manager may not be initialized yet.
|
||||
|
@ -3228,7 +3232,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
});
|
||||
// update memStore size
|
||||
region.incMemStoreSize(memStoreAccounting.getDataSize(), memStoreAccounting.getHeapSize(),
|
||||
memStoreAccounting.getOffHeapSize());
|
||||
memStoreAccounting.getOffHeapSize(), memStoreAccounting.getCellsCount());
|
||||
}
|
||||
|
||||
public boolean isDone() {
|
||||
|
|
|
@ -44,20 +44,23 @@ public class MemStoreSize {
|
|||
*/
|
||||
private final long offHeapSize;
|
||||
|
||||
private final int cellsCount;
|
||||
|
||||
/**
|
||||
* Package private constructor.
|
||||
*/
|
||||
MemStoreSize() {
|
||||
this(0L, 0L, 0L);
|
||||
this(0L, 0L, 0L, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Package private constructor.
|
||||
*/
|
||||
MemStoreSize(long dataSize, long heapSize, long offHeapSize) {
|
||||
MemStoreSize(long dataSize, long heapSize, long offHeapSize, int cellsCount) {
|
||||
this.dataSize = dataSize;
|
||||
this.heapSize = heapSize;
|
||||
this.offHeapSize = offHeapSize;
|
||||
this.cellsCount = cellsCount;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -67,10 +70,12 @@ public class MemStoreSize {
|
|||
this.dataSize = memStoreSize.getDataSize();
|
||||
this.heapSize = memStoreSize.getHeapSize();
|
||||
this.offHeapSize = memStoreSize.getOffHeapSize();
|
||||
this.cellsCount = memStoreSize.getCellsCount();
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0;
|
||||
return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0
|
||||
&& this.cellsCount == 0;
|
||||
}
|
||||
|
||||
public long getDataSize() {
|
||||
|
@ -85,6 +90,10 @@ public class MemStoreSize {
|
|||
return this.offHeapSize;
|
||||
}
|
||||
|
||||
public int getCellsCount() {
|
||||
return this.cellsCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
|
@ -93,9 +102,9 @@ public class MemStoreSize {
|
|||
if (!(obj instanceof MemStoreSize)) {
|
||||
return false;
|
||||
}
|
||||
MemStoreSize other = (MemStoreSize)obj;
|
||||
return this.dataSize == other.dataSize && this.heapSize == other.heapSize &&
|
||||
this.offHeapSize == other.offHeapSize;
|
||||
MemStoreSize other = (MemStoreSize) obj;
|
||||
return this.dataSize == other.dataSize && this.heapSize == other.heapSize
|
||||
&& this.offHeapSize == other.offHeapSize && this.cellsCount == other.cellsCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -103,12 +112,13 @@ public class MemStoreSize {
|
|||
long h = this.dataSize;
|
||||
h = h * 31 + this.heapSize;
|
||||
h = h * 31 + this.offHeapSize;
|
||||
h = h * 31 + this.cellsCount;
|
||||
return (int) h;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "dataSize=" + this.dataSize + ", getHeapSize=" + this.heapSize +
|
||||
", getOffHeapSize=" + this.offHeapSize;
|
||||
return "dataSize=" + this.dataSize + ", getHeapSize=" + this.heapSize + ", getOffHeapSize="
|
||||
+ this.offHeapSize + ", getCellsCount=" + this.cellsCount;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface MemStoreSizing {
|
||||
static final MemStoreSizing DUD = new MemStoreSizing() {
|
||||
MemStoreSizing DUD = new MemStoreSizing() {
|
||||
private final MemStoreSize mss = new MemStoreSize();
|
||||
|
||||
@Override
|
||||
|
@ -77,12 +77,18 @@ public interface MemStoreSizing {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
|
||||
long offHeapSizeDelta) {
|
||||
public int getCellsCount() {
|
||||
return this.mss.getCellsCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
|
||||
int cellsCountDelta) {
|
||||
throw new RuntimeException("I'm a DUD, you can't use me!");
|
||||
}
|
||||
|
||||
@Override public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
@Override
|
||||
public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
throw new RuntimeException("I'm a DUD, you can't use me!");
|
||||
}
|
||||
};
|
||||
|
@ -90,22 +96,25 @@ public interface MemStoreSizing {
|
|||
/**
|
||||
* @return The new dataSize ONLY as a convenience
|
||||
*/
|
||||
long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta);
|
||||
long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
|
||||
int cellsCountDelta);
|
||||
|
||||
default long incMemStoreSize(MemStoreSize delta) {
|
||||
return incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
|
||||
return incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize(),
|
||||
delta.getCellsCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The new dataSize ONLY as a convenience
|
||||
*/
|
||||
default long decMemStoreSize(long dataSizeDelta, long heapSizeDelta,
|
||||
long offHeapSizeDelta) {
|
||||
return incMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
|
||||
long offHeapSizeDelta, int cellsCountDelta) {
|
||||
return incMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta);
|
||||
}
|
||||
|
||||
default long decMemStoreSize(MemStoreSize delta) {
|
||||
return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), -delta.getOffHeapSize());
|
||||
return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), -delta.getOffHeapSize(),
|
||||
-delta.getCellsCount());
|
||||
}
|
||||
|
||||
boolean compareAndSetDataSize(long expected, long updated);
|
||||
|
@ -113,6 +122,7 @@ public interface MemStoreSizing {
|
|||
long getDataSize();
|
||||
long getHeapSize();
|
||||
long getOffHeapSize();
|
||||
int getCellsCount();
|
||||
|
||||
/**
|
||||
* @return Use this datastructure to return all three settings, {@link #getDataSize()},
|
||||
|
|
|
@ -50,9 +50,9 @@ public class MutableSegment extends Segment {
|
|||
protected MutableSegment(CellSet cellSet, CellComparator comparator,
|
||||
MemStoreLAB memStoreLAB, MemStoreSizing memstoreSizing) {
|
||||
super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
|
||||
incMemStoreSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata
|
||||
incMemStoreSize(0, DEEP_OVERHEAD, 0, 0); // update the mutable segment metadata
|
||||
if (memstoreSizing != null) {
|
||||
memstoreSizing.incMemStoreSize(0, DEEP_OVERHEAD, 0);
|
||||
memstoreSizing.incMemStoreSize(0, DEEP_OVERHEAD, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,9 +100,9 @@ public class MutableSegment extends Segment {
|
|||
int cellLen = getCellLength(cur);
|
||||
long heapSize = heapSizeChange(cur, true);
|
||||
long offHeapSize = offHeapSizeChange(cur, true);
|
||||
incMemStoreSize(-cellLen, -heapSize, -offHeapSize);
|
||||
incMemStoreSize(-cellLen, -heapSize, -offHeapSize, -1);
|
||||
if (memStoreSizing != null) {
|
||||
memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize);
|
||||
memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize, 1);
|
||||
}
|
||||
it.remove();
|
||||
} else {
|
||||
|
|
|
@ -32,35 +32,38 @@ class NonThreadSafeMemStoreSizing implements MemStoreSizing {
|
|||
private long dataSize = 0;
|
||||
private long heapSize = 0;
|
||||
private long offHeapSize = 0;
|
||||
private int cellsCount = 0;
|
||||
|
||||
NonThreadSafeMemStoreSizing() {
|
||||
this(0, 0, 0);
|
||||
this(0, 0, 0, 0);
|
||||
}
|
||||
|
||||
NonThreadSafeMemStoreSizing(MemStoreSize mss) {
|
||||
this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), mss.getCellsCount());
|
||||
}
|
||||
|
||||
NonThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
|
||||
incMemStoreSize(dataSize, heapSize, offHeapSize);
|
||||
NonThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize, int cellsCount) {
|
||||
incMemStoreSize(dataSize, heapSize, offHeapSize, cellsCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MemStoreSize getMemStoreSize() {
|
||||
return new MemStoreSize(this.dataSize, this.heapSize, this.offHeapSize);
|
||||
return new MemStoreSize(this.dataSize, this.heapSize, this.offHeapSize, this.cellsCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
|
||||
long offHeapSizeDelta) {
|
||||
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
|
||||
int cellsCountDelta) {
|
||||
this.offHeapSize += offHeapSizeDelta;
|
||||
this.heapSize += heapSizeDelta;
|
||||
this.dataSize += dataSizeDelta;
|
||||
this.cellsCount += cellsCountDelta;
|
||||
return this.dataSize;
|
||||
}
|
||||
|
||||
@Override public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
if(dataSize == expected) {
|
||||
@Override
|
||||
public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
if (dataSize == expected) {
|
||||
dataSize = updated;
|
||||
return true;
|
||||
}
|
||||
|
@ -82,6 +85,11 @@ class NonThreadSafeMemStoreSizing implements MemStoreSizing {
|
|||
return offHeapSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCellsCount() {
|
||||
return cellsCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getMemStoreSize().toString();
|
||||
|
|
|
@ -57,8 +57,9 @@ public class RegionServicesForStores {
|
|||
this.region = region;
|
||||
}
|
||||
|
||||
public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
|
||||
int cellsCountDelta) {
|
||||
region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta, cellsCountDelta);
|
||||
}
|
||||
|
||||
public RegionInfo getRegionInfo() {
|
||||
|
|
|
@ -28,9 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.ExtendedCell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -83,17 +81,19 @@ public abstract class Segment implements MemStoreSizing {
|
|||
long dataSize = 0;
|
||||
long heapSize = 0;
|
||||
long OffHeapSize = 0;
|
||||
int cellsCount = 0;
|
||||
for (Segment segment : segments) {
|
||||
MemStoreSize memStoreSize = segment.getMemStoreSize();
|
||||
dataSize += memStoreSize.getDataSize();
|
||||
heapSize += memStoreSize.getHeapSize();
|
||||
OffHeapSize += memStoreSize.getOffHeapSize();
|
||||
cellsCount += memStoreSize.getCellsCount();
|
||||
}
|
||||
this.comparator = comparator;
|
||||
this.updatesLock = new ReentrantReadWriteLock();
|
||||
// Do we need to be thread safe always? What if ImmutableSegment?
|
||||
// DITTO for the TimeRangeTracker below.
|
||||
this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize);
|
||||
this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize, cellsCount);
|
||||
this.timeRangeTracker = trt;
|
||||
}
|
||||
|
||||
|
@ -141,12 +141,6 @@ public abstract class Segment implements MemStoreSizing {
|
|||
return getCellSet().isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of cells in segment
|
||||
*/
|
||||
public int getCellsCount() {
|
||||
return getCellSet().size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Closing a segment before it is being discarded
|
||||
|
@ -175,7 +169,7 @@ public abstract class Segment implements MemStoreSizing {
|
|||
return cell;
|
||||
}
|
||||
|
||||
Cell cellFromMslab = null;
|
||||
Cell cellFromMslab;
|
||||
if (forceCloneOfBigCell) {
|
||||
cellFromMslab = this.memStoreLAB.forceCopyOfBigCellInto(cell);
|
||||
} else {
|
||||
|
@ -189,8 +183,7 @@ public abstract class Segment implements MemStoreSizing {
|
|||
*/
|
||||
@VisibleForTesting
|
||||
static int getCellLength(Cell cell) {
|
||||
return cell instanceof ExtendedCell ? ((ExtendedCell)cell).getSerializedSize():
|
||||
KeyValueUtil.length(cell);
|
||||
return cell.getSerializedSize();
|
||||
}
|
||||
|
||||
public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
|
||||
|
@ -247,8 +240,13 @@ public abstract class Segment implements MemStoreSizing {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) {
|
||||
return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
|
||||
public int getCellsCount() {
|
||||
return memStoreSizing.getCellsCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead, int cellsCount) {
|
||||
return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead, cellsCount);
|
||||
}
|
||||
|
||||
public boolean sharedLock() {
|
||||
|
@ -324,20 +322,21 @@ public abstract class Segment implements MemStoreSizing {
|
|||
MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) {
|
||||
long delta = 0;
|
||||
long cellSize = getCellLength(cellToAdd);
|
||||
int cellsCount = succ ? 1 : 0;
|
||||
// If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
|
||||
// MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
|
||||
// than the counted number)
|
||||
if (succ || mslabUsed) {
|
||||
delta = cellSize;
|
||||
}
|
||||
if(sizeAddedPreOperation) {
|
||||
if (sizeAddedPreOperation) {
|
||||
delta -= cellSize;
|
||||
}
|
||||
long heapSize = heapSizeChange(cellToAdd, succ || mslabUsed);
|
||||
long offHeapSize = offHeapSizeChange(cellToAdd, succ || mslabUsed);
|
||||
incMemStoreSize(delta, heapSize, offHeapSize);
|
||||
incMemStoreSize(delta, heapSize, offHeapSize, cellsCount);
|
||||
if (memstoreSizing != null) {
|
||||
memstoreSizing.incMemStoreSize(delta, heapSize, offHeapSize);
|
||||
memstoreSizing.incMemStoreSize(delta, heapSize, offHeapSize, cellsCount);
|
||||
}
|
||||
getTimeRangeTracker().includeTimestamp(cellToAdd);
|
||||
minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -35,31 +36,36 @@ class ThreadSafeMemStoreSizing implements MemStoreSizing {
|
|||
private final AtomicLong dataSize = new AtomicLong();
|
||||
private final AtomicLong heapSize = new AtomicLong();
|
||||
private final AtomicLong offHeapSize = new AtomicLong();
|
||||
private final AtomicInteger cellsCount = new AtomicInteger();
|
||||
|
||||
ThreadSafeMemStoreSizing() {
|
||||
this(0, 0, 0);
|
||||
this(0, 0, 0, 0);
|
||||
}
|
||||
|
||||
ThreadSafeMemStoreSizing(MemStoreSize mss) {
|
||||
this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), mss.getCellsCount());
|
||||
}
|
||||
|
||||
ThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
|
||||
incMemStoreSize(dataSize, heapSize, offHeapSize);
|
||||
ThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize, int cellsCount) {
|
||||
incMemStoreSize(dataSize, heapSize, offHeapSize, cellsCount);
|
||||
}
|
||||
|
||||
public MemStoreSize getMemStoreSize() {
|
||||
return new MemStoreSize(getDataSize(), getHeapSize(), getOffHeapSize());
|
||||
return new MemStoreSize(getDataSize(), getHeapSize(), getOffHeapSize(), getCellsCount());
|
||||
}
|
||||
|
||||
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
@Override
|
||||
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
|
||||
int cellsCountDelta) {
|
||||
this.offHeapSize.addAndGet(offHeapSizeDelta);
|
||||
this.heapSize.addAndGet(heapSizeDelta);
|
||||
this.cellsCount.addAndGet(cellsCountDelta);
|
||||
return this.dataSize.addAndGet(dataSizeDelta);
|
||||
}
|
||||
|
||||
@Override public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
return dataSize.compareAndSet(expected,updated);
|
||||
@Override
|
||||
public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
return dataSize.compareAndSet(expected, updated);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,6 +83,11 @@ class ThreadSafeMemStoreSizing implements MemStoreSizing {
|
|||
return offHeapSize.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCellsCount() {
|
||||
return cellsCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getMemStoreSize().toString();
|
||||
|
|
|
@ -48,20 +48,20 @@ public class TestCellSkipListSet extends TestCase {
|
|||
}
|
||||
|
||||
public void testAdd() throws Exception {
|
||||
byte [] bytes = Bytes.toBytes(getName());
|
||||
byte[] bytes = Bytes.toBytes(getName());
|
||||
KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
|
||||
this.csls.add(kv);
|
||||
assertTrue(this.csls.contains(kv));
|
||||
assertEquals(1, this.csls.size());
|
||||
assertEquals(1, this.csls.getDelegatee().size());
|
||||
Cell first = this.csls.first();
|
||||
assertTrue(kv.equals(first));
|
||||
assertTrue(Bytes.equals(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
|
||||
first.getValueArray(), first.getValueOffset(), first.getValueLength()));
|
||||
// Now try overwritting
|
||||
byte [] overwriteValue = Bytes.toBytes("overwrite");
|
||||
byte[] overwriteValue = Bytes.toBytes("overwrite");
|
||||
KeyValue overwrite = new KeyValue(bytes, bytes, bytes, overwriteValue);
|
||||
this.csls.add(overwrite);
|
||||
assertEquals(1, this.csls.size());
|
||||
assertEquals(1, this.csls.getDelegatee().size());
|
||||
first = this.csls.first();
|
||||
assertTrue(Bytes.equals(overwrite.getValueArray(), overwrite.getValueOffset(),
|
||||
overwrite.getValueLength(), first.getValueArray(), first.getValueOffset(),
|
||||
|
|
|
@ -679,7 +679,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
|
||||
mss.getCellsCount());
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(7, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -756,7 +757,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
|
||||
mss.getCellsCount());
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(4, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -831,6 +833,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
byte[] qf = Bytes.toBytes("testqualifier");
|
||||
long size = hmc.getActive().getDataSize();
|
||||
long heapOverhead = hmc.getActive().getHeapSize();
|
||||
int cellsCount = hmc.getActive().getCellsCount();
|
||||
int totalLen = 0;
|
||||
for (int i = 0; i < keys.length; i++) {
|
||||
long timestamp = System.currentTimeMillis();
|
||||
|
@ -843,7 +846,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
|
||||
}
|
||||
regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
|
||||
hmc.getActive().getHeapSize() - heapOverhead, 0);
|
||||
hmc.getActive().getHeapSize() - heapOverhead, 0,
|
||||
hmc.getActive().getCellsCount() - cellsCount);
|
||||
return totalLen;
|
||||
}
|
||||
|
||||
|
@ -853,6 +857,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
byte[] qf = Bytes.toBytes("testqualifier");
|
||||
long size = hmc.getActive().getDataSize();
|
||||
long heapOverhead = hmc.getActive().getHeapSize();
|
||||
int cellsCount = hmc.getActive().getCellsCount();
|
||||
int totalLen = 0;
|
||||
for (int i = 0; i < keys.length; i++) {
|
||||
long timestamp = System.currentTimeMillis();
|
||||
|
@ -864,7 +869,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
|
||||
}
|
||||
regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
|
||||
hmc.getActive().getHeapSize() - heapOverhead, 0);
|
||||
hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount);
|
||||
return totalLen;
|
||||
}
|
||||
|
||||
|
|
|
@ -283,13 +283,13 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
|
||||
mss.getCellsCount());
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(4, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
||||
memstore.clearSnapshot(snapshot.getId());
|
||||
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -920,7 +920,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
}
|
||||
MemStoreSize mss = memstoreSizing.getMemStoreSize();
|
||||
regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(),
|
||||
mss.getOffHeapSize());
|
||||
mss.getOffHeapSize(), mss.getCellsCount());
|
||||
return mss.getDataSize();
|
||||
}
|
||||
|
||||
|
|
|
@ -265,7 +265,7 @@ public class TestHStore {
|
|||
MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
|
||||
store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
|
||||
// add the heap size of active (mutable) segment
|
||||
kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0);
|
||||
kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
|
||||
mss = store.memstore.getFlushableSize();
|
||||
assertEquals(kvSize.getMemStoreSize(), mss);
|
||||
// Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
|
||||
|
@ -278,12 +278,12 @@ public class TestHStore {
|
|||
}
|
||||
// due to snapshot, change mutable to immutable segment
|
||||
kvSize.incMemStoreSize(0,
|
||||
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD, 0);
|
||||
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
|
||||
mss = store.memstore.getFlushableSize();
|
||||
assertEquals(kvSize.getMemStoreSize(), mss);
|
||||
MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
|
||||
store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
|
||||
kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0);
|
||||
store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
|
||||
kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
|
||||
// Even though we add a new kv, we expect the flushable size to be 'same' since we have
|
||||
// not yet cleared the snapshot -- the above flush failed.
|
||||
assertEquals(kvSize.getMemStoreSize(), mss);
|
||||
|
|
|
@ -50,22 +50,18 @@ public class TestRegionServerAccounting {
|
|||
public void testOnheapMemstoreHigherWaterMarkLimits() {
|
||||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
long dataSize = regionServerAccounting.getGlobalMemStoreLimit();
|
||||
MemStoreSize memstoreSize =
|
||||
new MemStoreSize(dataSize, dataSize, 0);
|
||||
MemStoreSize memstoreSize = new MemStoreSize(dataSize, dataSize, 0, 0);
|
||||
regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK,
|
||||
regionServerAccounting.isAboveHighWaterMark());
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK, regionServerAccounting.isAboveHighWaterMark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnheapMemstoreLowerWaterMarkLimits() {
|
||||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
long dataSize = regionServerAccounting.getGlobalMemStoreLimit();
|
||||
MemStoreSize memstoreSize =
|
||||
new MemStoreSize(dataSize, dataSize, 0);
|
||||
MemStoreSize memstoreSize = new MemStoreSize(dataSize, dataSize, 0, 0);
|
||||
regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK,
|
||||
regionServerAccounting.isAboveLowWaterMark());
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK, regionServerAccounting.isAboveLowWaterMark());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -76,7 +72,7 @@ public class TestRegionServerAccounting {
|
|||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
// this will breach offheap limit as data size is higher and not due to heap size
|
||||
MemStoreSize memstoreSize =
|
||||
new MemStoreSize((3L * 1024L * 1024L * 1024L), 0, (1L * 1024L * 1024L * 1024L));
|
||||
new MemStoreSize((3L * 1024L * 1024L * 1024L), 0, (1L * 1024L * 1024L * 1024L), 100);
|
||||
regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_OFFHEAP_HIGHER_MARK,
|
||||
regionServerAccounting.isAboveHighWaterMark());
|
||||
|
@ -90,11 +86,9 @@ public class TestRegionServerAccounting {
|
|||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
// this will breach higher limit as heap size is higher and not due to offheap size
|
||||
long dataSize = regionServerAccounting.getGlobalOnHeapMemStoreLimit();
|
||||
MemStoreSize memstoreSize =
|
||||
new MemStoreSize(dataSize, dataSize, 0);
|
||||
MemStoreSize memstoreSize = new MemStoreSize(dataSize, dataSize, 0, 100);
|
||||
regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK,
|
||||
regionServerAccounting.isAboveHighWaterMark());
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK, regionServerAccounting.isAboveHighWaterMark());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -105,10 +99,9 @@ public class TestRegionServerAccounting {
|
|||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
// this will breach offheap limit as data size is higher and not due to heap size
|
||||
MemStoreSize memstoreSize =
|
||||
new MemStoreSize((3L * 1024L * 1024L * 1024L), 0, (1L * 1024L * 1024L * 1024L));
|
||||
new MemStoreSize((3L * 1024L * 1024L * 1024L), 0, (1L * 1024L * 1024L * 1024L), 100);
|
||||
regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_OFFHEAP_LOWER_MARK,
|
||||
regionServerAccounting.isAboveLowWaterMark());
|
||||
assertEquals(FlushType.ABOVE_OFFHEAP_LOWER_MARK, regionServerAccounting.isAboveLowWaterMark());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -119,10 +112,8 @@ public class TestRegionServerAccounting {
|
|||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
// this will breach higher limit as heap size is higher and not due to offheap size
|
||||
long dataSize = regionServerAccounting.getGlobalOnHeapMemStoreLimit();
|
||||
MemStoreSize memstoreSize =
|
||||
new MemStoreSize(dataSize, dataSize, 0);
|
||||
MemStoreSize memstoreSize = new MemStoreSize(dataSize, dataSize, 0, 100);
|
||||
regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK,
|
||||
regionServerAccounting.isAboveLowWaterMark());
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK, regionServerAccounting.isAboveLowWaterMark());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue