HBASE-20411 Ameliorate MutableSegment synchronize

Change the MemStore size accounting so we don't synchronize across three
volatiles applying deltas. Instead:

 + Make MemStoreSize, a datastructure of our memstore size longs, immutable.
 + Undo MemStoreSizing being an instance of MemStoreSize; instead it has-a.
 + Make two MemStoreSizing implementations; one thread-safe, the other not.
 + Let all memory sizing longs run independent, untied by
   synchronize (Huaxiang and Anoop suggestion) using atomiclongs.
 + Review all use of MemStoreSizing. Many are single-threaded and do
   not need to be synchronized; use the non-thread safe counter.

TODO: Use this technique accounting at the global level too.
This commit is contained in:
Michael Stack 2018-05-06 21:29:49 -07:00
parent c60578d982
commit 021f66d11d
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
28 changed files with 598 additions and 398 deletions

View File

@ -165,13 +165,7 @@ public abstract class AbstractMemStore implements MemStore {
@Override @Override
public MemStoreSize getSnapshotSize() { public MemStoreSize getSnapshotSize() {
return getSnapshotSizing(); return this.snapshot.getMemStoreSize();
}
MemStoreSizing getSnapshotSizing() {
return new MemStoreSizing(this.snapshot.keySize(),
this.snapshot.heapSize(),
this.snapshot.offHeapSize());
} }
@Override @Override

View File

@ -40,7 +40,7 @@ public class CSLMImmutableSegment extends ImmutableSegment {
super(segment); super(segment);
// update the segment metadata heap size // update the segment metadata heap size
long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM; long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM;
incSize(0, indexOverhead, 0); // CSLM is always on-heap incMemStoreSize(0, indexOverhead, 0); // CSLM is always on-heap
} }
@Override @Override

View File

@ -45,7 +45,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) { MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
incSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap incMemStoreSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
initializeCellSet(numOfCells, iterator, action); initializeCellSet(numOfCells, iterator, action);
} }
@ -59,7 +59,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
MemStoreCompactionStrategy.Action action) { MemStoreCompactionStrategy.Action action) {
super(segment); // initiailize the upper class super(segment); // initiailize the upper class
long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
incSize(0, indexOverhead, 0); // CAM is always on-heap incMemStoreSize(0, indexOverhead, 0); // CAM is always on-heap
int numOfCells = segment.getCellsCount(); int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap and update the CellSet of this Segment // build the new CellSet based on CellChunkMap and update the CellSet of this Segment
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(), reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
@ -67,7 +67,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
// arrange the meta-data size, decrease all meta-data sizes related to SkipList; // arrange the meta-data size, decrease all meta-data sizes related to SkipList;
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
incSize(0, newSegmentSizeDelta, 0); incMemStoreSize(0, newSegmentSizeDelta, 0);
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0); memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
} }

View File

@ -58,9 +58,9 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
boolean onHeap = getMemStoreLAB().isOnHeap(); boolean onHeap = getMemStoreLAB().isOnHeap();
// initiate the heapSize with the size of the segment metadata // initiate the heapSize with the size of the segment metadata
if(onHeap) { if(onHeap) {
incSize(0, indexOverhead, 0); incMemStoreSize(0, indexOverhead, 0);
} else { } else {
incSize(0, 0, indexOverhead); incMemStoreSize(0, 0, indexOverhead);
} }
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
initializeCellSet(numOfCells, iterator, action); initializeCellSet(numOfCells, iterator, action);
@ -79,9 +79,9 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
boolean onHeap = getMemStoreLAB().isOnHeap(); boolean onHeap = getMemStoreLAB().isOnHeap();
// initiate the heapSize with the size of the segment metadata // initiate the heapSize with the size of the segment metadata
if(onHeap) { if(onHeap) {
incSize(0, indexOverhead, 0); incMemStoreSize(0, indexOverhead, 0);
} else { } else {
incSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM); incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM);
} }
int numOfCells = segment.getCellsCount(); int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap // build the new CellSet based on CellChunkMap
@ -92,10 +92,10 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// (reinitializeCellSet doesn't take the care for the sizes) // (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
if(onHeap) { if(onHeap) {
incSize(0, newSegmentSizeDelta, 0); incMemStoreSize(0, newSegmentSizeDelta, 0);
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0); memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
} else { } else {
incSize(0, 0, newSegmentSizeDelta); incMemStoreSize(0, 0, newSegmentSizeDelta);
memstoreSizing.incMemStoreSize(0, 0, newSegmentSizeDelta); memstoreSizing.incMemStoreSize(0, 0, newSegmentSizeDelta);
} }
@ -333,7 +333,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
long heapOverhead = newHeapSize - oldHeapSize; long heapOverhead = newHeapSize - oldHeapSize;
long offHeapOverhead = newOffHeapSize - oldOffHeapSize; long offHeapOverhead = newOffHeapSize - oldOffHeapSize;
//TODO: maybe need to update the dataSize of the region //TODO: maybe need to update the dataSize of the region
incSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead); incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
return cell; return cell;
} }
} }

View File

@ -155,12 +155,12 @@ public class CompactingMemStore extends AbstractMemStore {
*/ */
@Override @Override
public MemStoreSize size() { public MemStoreSize size() {
MemStoreSizing memstoreSizing = new MemStoreSizing(); MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
memstoreSizing.incMemStoreSize(active.getMemStoreSize()); memstoreSizing.incMemStoreSize(active.getMemStoreSize());
for (Segment item : pipeline.getSegments()) { for (Segment item : pipeline.getSegments()) {
memstoreSizing.incMemStoreSize(item.getMemStoreSize()); memstoreSizing.incMemStoreSize(item.getMemStoreSize());
} }
return memstoreSizing; return memstoreSizing.getMemStoreSize();
} }
/** /**
@ -216,42 +216,38 @@ public class CompactingMemStore extends AbstractMemStore {
return new MemStoreSnapshot(snapshotId, this.snapshot); return new MemStoreSnapshot(snapshotId, this.snapshot);
} }
/**
* On flush, how much memory we will clear.
* @return size of data that is going to be flushed
*/
@Override @Override
public MemStoreSize getFlushableSize() { public MemStoreSize getFlushableSize() {
MemStoreSizing snapshotSizing = getSnapshotSizing(); MemStoreSize mss = getSnapshotSize();
if (snapshotSizing.getDataSize() == 0) { if (mss.getDataSize() == 0) {
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
if (compositeSnapshot) { if (compositeSnapshot) {
snapshotSizing = pipeline.getPipelineSizing(); MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
snapshotSizing.incMemStoreSize(active.getMemStoreSize()); memStoreSizing.incMemStoreSize(this.active.getMemStoreSize());
mss = memStoreSizing.getMemStoreSize();
} else { } else {
snapshotSizing = pipeline.getTailSizing(); mss = pipeline.getTailSize();
} }
} }
return snapshotSizing.getDataSize() > 0 ? snapshotSizing return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
: new MemStoreSize(active.getMemStoreSize());
} }
@Override @Override
protected long keySize() { protected long keySize() {
// Need to consider keySize of all segments in pipeline and active // Need to consider dataSize/keySize of all segments in pipeline and active
long k = this.active.keySize(); long keySize = this.active.getDataSize();
for (Segment segment : this.pipeline.getSegments()) { for (Segment segment : this.pipeline.getSegments()) {
k += segment.keySize(); keySize += segment.getDataSize();
} }
return k; return keySize;
} }
@Override @Override
protected long heapSize() { protected long heapSize() {
// Need to consider heapOverhead of all segments in pipeline and active // Need to consider heapOverhead of all segments in pipeline and active
long h = this.active.heapSize(); long h = this.active.getHeapSize();
for (Segment segment : this.pipeline.getSegments()) { for (Segment segment : this.pipeline.getSegments()) {
h += segment.heapSize(); h += segment.getHeapSize();
} }
return h; return h;
} }
@ -447,7 +443,7 @@ public class CompactingMemStore extends AbstractMemStore {
@VisibleForTesting @VisibleForTesting
protected boolean shouldFlushInMemory() { protected boolean shouldFlushInMemory() {
if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold
if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush
return false; // regardless the size return false; // regardless the size
} }
@ -571,7 +567,7 @@ public class CompactingMemStore extends AbstractMemStore {
// debug method // debug method
public void debug() { public void debug() {
String msg = "active size=" + this.active.keySize(); String msg = "active size=" + this.active.getDataSize();
msg += " in-memory flush size is "+ inmemoryFlushSize; msg += " in-memory flush size is "+ inmemoryFlushSize;
msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false"); msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");

View File

@ -23,11 +23,12 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/** /**
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
@ -135,19 +136,21 @@ public class CompactionPipeline {
// update the global memstore size counter // update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix); long suffixDataSize = getSegmentsKeySize(suffix);
long newDataSize = 0; long newDataSize = 0;
if(segment != null) newDataSize = segment.keySize(); if(segment != null) {
newDataSize = segment.getDataSize();
}
long dataSizeDelta = suffixDataSize - newDataSize; long dataSizeDelta = suffixDataSize - newDataSize;
long suffixHeapSize = getSegmentsHeapSize(suffix); long suffixHeapSize = getSegmentsHeapSize(suffix);
long suffixOffHeapSize = getSegmentsOffHeapSize(suffix); long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
long newHeapSize = 0; long newHeapSize = 0;
long newOffHeapSize = 0; long newOffHeapSize = 0;
if(segment != null) { if(segment != null) {
newHeapSize = segment.heapSize(); newHeapSize = segment.getHeapSize();
newOffHeapSize = segment.offHeapSize(); newOffHeapSize = segment.getOffHeapSize();
} }
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize; long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
long heapSizeDelta = suffixHeapSize - newHeapSize; long heapSizeDelta = suffixHeapSize - newHeapSize;
region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta)); region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
LOG.debug("Suffix data size={}, new segment data size={}, " LOG.debug("Suffix data size={}, new segment data size={}, "
+ "suffix heap size={}," + "new segment heap size={}" + "suffix heap size={}," + "new segment heap size={}"
+ "suffix off heap size={}," + "new segment off heap size={}" + "suffix off heap size={}," + "new segment off heap size={}"
@ -164,7 +167,7 @@ public class CompactionPipeline {
private static long getSegmentsHeapSize(List<? extends Segment> list) { private static long getSegmentsHeapSize(List<? extends Segment> list) {
long res = 0; long res = 0;
for (Segment segment : list) { for (Segment segment : list) {
res += segment.heapSize(); res += segment.getHeapSize();
} }
return res; return res;
} }
@ -172,7 +175,7 @@ public class CompactionPipeline {
private static long getSegmentsOffHeapSize(List<? extends Segment> list) { private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
long res = 0; long res = 0;
for (Segment segment : list) { for (Segment segment : list) {
res += segment.offHeapSize(); res += segment.getOffHeapSize();
} }
return res; return res;
} }
@ -180,7 +183,7 @@ public class CompactionPipeline {
private static long getSegmentsKeySize(List<? extends Segment> list) { private static long getSegmentsKeySize(List<? extends Segment> list) {
long res = 0; long res = 0;
for (Segment segment : list) { for (Segment segment : list) {
res += segment.keySize(); res += segment.getDataSize();
} }
return res; return res;
} }
@ -211,15 +214,17 @@ public class CompactionPipeline {
int i = 0; int i = 0;
for (ImmutableSegment s : pipeline) { for (ImmutableSegment s : pipeline) {
if ( s.canBeFlattened() ) { if ( s.canBeFlattened() ) {
MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated // size to be updated
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action); (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
replaceAtIndex(i,newS); replaceAtIndex(i,newS);
if(region != null) { if(region != null) {
// update the global memstore size counter // Update the global memstore size counter upon flattening there is no change in the
// upon flattening there is no change in the data size // data size
region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize(), MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
newMemstoreAccounting.getOffHeapSize())); Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!");
region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
} }
LOG.debug("Compaction pipeline segment {} flattened", s); LOG.debug("Compaction pipeline segment {} flattened", s);
return true; return true;
@ -254,19 +259,18 @@ public class CompactionPipeline {
return minSequenceId; return minSequenceId;
} }
public MemStoreSizing getTailSizing() { public MemStoreSize getTailSize() {
LinkedList<? extends Segment> localCopy = readOnlyCopy; LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (localCopy.isEmpty()) return new MemStoreSizing(); return localCopy.isEmpty()? new MemStoreSize(): localCopy.peekLast().getMemStoreSize();
return new MemStoreSizing(localCopy.peekLast().getMemStoreSize());
} }
public MemStoreSizing getPipelineSizing() { public MemStoreSize getPipelineSize() {
MemStoreSizing memStoreSizing = new MemStoreSizing(); MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
LinkedList<? extends Segment> localCopy = readOnlyCopy; LinkedList<? extends Segment> localCopy = readOnlyCopy;
for (Segment segment : localCopy) { for (Segment segment : localCopy) {
memStoreSizing.incMemStoreSize(segment.getMemStoreSize()); memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
} }
return memStoreSizing; return memStoreSizing.getMemStoreSize();
} }
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,

View File

@ -48,7 +48,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
for (ImmutableSegment s : segments) { for (ImmutableSegment s : segments) {
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax()); this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin()); this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
this.keySize += s.keySize(); this.keySize += s.getDataSize();
} }
} }
@ -170,7 +170,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
* @return Sum of all cell sizes. * @return Sum of all cell sizes.
*/ */
@Override @Override
public long keySize() { public long getDataSize() {
return this.keySize; return this.keySize;
} }
@ -178,10 +178,10 @@ public class CompositeImmutableSegment extends ImmutableSegment {
* @return The heap size of this segment. * @return The heap size of this segment.
*/ */
@Override @Override
public long heapSize() { public long getHeapSize() {
long result = 0; long result = 0;
for (ImmutableSegment s : segments) { for (ImmutableSegment s : segments) {
result += s.heapSize(); result += s.getHeapSize();
} }
return result; return result;
} }
@ -190,7 +190,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
* Updates the heap size counter of the segment by the given delta * Updates the heap size counter of the segment by the given delta
*/ */
@Override @Override
protected void incSize(long delta, long heapOverhead, long offHeapOverhead) { public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner"); throw new IllegalStateException("Not supported by CompositeImmutableScanner");
} }

View File

@ -97,26 +97,20 @@ public class DefaultMemStore extends AbstractMemStore {
return new MemStoreSnapshot(this.snapshotId, this.snapshot); return new MemStoreSnapshot(this.snapshotId, this.snapshot);
} }
/**
* On flush, how much memory we will clear from the active cell set.
*
* @return size of data that is going to be flushed from active set
*/
@Override @Override
public MemStoreSize getFlushableSize() { public MemStoreSize getFlushableSize() {
MemStoreSize snapshotSize = getSnapshotSize(); MemStoreSize mss = getSnapshotSize();
return snapshotSize.getDataSize() > 0 ? snapshotSize return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
: new MemStoreSize(active.getMemStoreSize());
} }
@Override @Override
protected long keySize() { protected long keySize() {
return this.active.keySize(); return this.active.getDataSize();
} }
@Override @Override
protected long heapSize() { protected long heapSize() {
return this.active.heapSize(); return this.active.getHeapSize();
} }
@Override @Override
@ -154,7 +148,7 @@ public class DefaultMemStore extends AbstractMemStore {
@Override @Override
public MemStoreSize size() { public MemStoreSize size() {
return new MemStoreSize(active.getMemStoreSize()); return active.getMemStoreSize();
} }
/** /**
@ -193,26 +187,27 @@ public class DefaultMemStore extends AbstractMemStore {
byte [] fam = Bytes.toBytes("col"); byte [] fam = Bytes.toBytes("col");
byte [] qf = Bytes.toBytes("umn"); byte [] qf = Bytes.toBytes("umn");
byte [] empty = new byte[0]; byte [] empty = new byte[0];
MemStoreSizing memstoreSizing = new MemStoreSizing(); MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
// Give each its own ts // Give each its own ts
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing); memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
} }
LOG.info("memstore1 estimated size=" LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
+ (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize())); memStoreSizing.getMemStoreSize().getHeapSize());
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing); memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
} }
LOG.info("memstore1 estimated size (2nd loading of same data)=" LOG.info("memstore1 estimated size (2nd loading of same data)={}",
+ (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize())); memStoreSizing.getMemStoreSize().getDataSize() +
memStoreSizing.getMemStoreSize().getHeapSize());
// Make a variably sized memstore. // Make a variably sized memstore.
DefaultMemStore memstore2 = new DefaultMemStore(); DefaultMemStore memstore2 = new DefaultMemStore();
memstoreSizing = new MemStoreSizing(); memStoreSizing = new NonThreadSafeMemStoreSizing();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSizing); memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing);
} }
LOG.info("memstore2 estimated size=" LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
+ (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize())); memStoreSizing.getMemStoreSize().getHeapSize());
final int seconds = 30; final int seconds = 30;
LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
LOG.info("Exiting."); LOG.info("Exiting.");

View File

@ -292,7 +292,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
// Track data size in all memstores // Track data size in all memstores
private final MemStoreSizing memStoreSize = new MemStoreSizing(); private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this); private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this);
// Debug possible data loss due to WAL off // Debug possible data loss due to WAL off
@ -1210,36 +1210,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Increase the size of mem store in this region and the size of global mem * Increase the size of mem store in this region and the size of global mem
* store * store
*/ */
public void incMemStoreSize(MemStoreSize memStoreSize) { void incMemStoreSize(MemStoreSize mss) {
if (this.rsAccounting != null) { incMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
rsAccounting.incGlobalMemStoreSize(memStoreSize);
}
long dataSize;
synchronized (this.memStoreSize) {
this.memStoreSize.incMemStoreSize(memStoreSize);
dataSize = this.memStoreSize.getDataSize();
}
checkNegativeMemStoreDataSize(dataSize, memStoreSize.getDataSize());
} }
public void decrMemStoreSize(MemStoreSize memStoreSize) { void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
if (this.rsAccounting != null) { if (this.rsAccounting != null) {
rsAccounting.decGlobalMemStoreSize(memStoreSize); rsAccounting.incGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
} }
long size; long dataSize =
synchronized (this.memStoreSize) { this.memStoreSizing.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
this.memStoreSize.decMemStoreSize(memStoreSize); checkNegativeMemStoreDataSize(dataSize, dataSizeDelta);
size = this.memStoreSize.getDataSize(); }
void decrMemStoreSize(MemStoreSize mss) {
decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
}
void decrMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
if (this.rsAccounting != null) {
rsAccounting.decGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
} }
checkNegativeMemStoreDataSize(size, -memStoreSize.getDataSize()); long dataSize =
this.memStoreSizing.decMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
checkNegativeMemStoreDataSize(dataSize, -dataSizeDelta);
} }
private void checkNegativeMemStoreDataSize(long memStoreDataSize, long delta) { private void checkNegativeMemStoreDataSize(long memStoreDataSize, long delta) {
// This is extremely bad if we make memStoreSize negative. Log as much info on the offending // This is extremely bad if we make memStoreSizing negative. Log as much info on the offending
// caller as possible. (memStoreSize might be a negative value already -- freeing memory) // caller as possible. (memStoreSizing might be a negative value already -- freeing memory)
if (memStoreDataSize < 0) { if (memStoreDataSize < 0) {
LOG.error("Asked to modify this region's (" + this.toString() LOG.error("Asked to modify this region's (" + this.toString()
+ ") memStoreSize to a negative value which is incorrect. Current memStoreSize=" + ") memStoreSizing to a negative value which is incorrect. Current memStoreSizing="
+ (memStoreDataSize - delta) + ", delta=" + delta, new Exception()); + (memStoreDataSize - delta) + ", delta=" + delta, new Exception());
} }
} }
@ -1274,17 +1276,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override @Override
public long getMemStoreDataSize() { public long getMemStoreDataSize() {
return memStoreSize.getDataSize(); return memStoreSizing.getDataSize();
} }
@Override @Override
public long getMemStoreHeapSize() { public long getMemStoreHeapSize() {
return memStoreSize.getHeapSize(); return memStoreSizing.getHeapSize();
} }
@Override @Override
public long getMemStoreOffHeapSize() { public long getMemStoreOffHeapSize() {
return memStoreSize.getOffHeapSize(); return memStoreSizing.getOffHeapSize();
} }
/** @return store services for this region, to access services required by store level needs */ /** @return store services for this region, to access services required by store level needs */
@ -1555,7 +1557,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int failedfFlushCount = 0; int failedfFlushCount = 0;
int flushCount = 0; int flushCount = 0;
long tmp = 0; long tmp = 0;
long remainingSize = this.memStoreSize.getDataSize(); long remainingSize = this.memStoreSizing.getDataSize();
while (remainingSize > 0) { while (remainingSize > 0) {
try { try {
internalFlushcache(status); internalFlushcache(status);
@ -1564,7 +1566,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
" (carrying snapshot?) " + this); " (carrying snapshot?) " + this);
} }
flushCount++; flushCount++;
tmp = this.memStoreSize.getDataSize(); tmp = this.memStoreSizing.getDataSize();
if (tmp >= remainingSize) { if (tmp >= remainingSize) {
failedfFlushCount++; failedfFlushCount++;
} }
@ -1598,13 +1600,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// close each store in parallel // close each store in parallel
for (HStore store : stores.values()) { for (HStore store : stores.values()) {
MemStoreSize flushableSize = store.getFlushableSize(); MemStoreSize mss = store.getFlushableSize();
if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) { if (!(abort || mss.getDataSize() == 0 || writestate.readOnly)) {
if (getRegionServerServices() != null) { if (getRegionServerServices() != null) {
getRegionServerServices().abort("Assertion failed while closing store " getRegionServerServices().abort("Assertion failed while closing store "
+ getRegionInfo().getRegionNameAsString() + " " + store + getRegionInfo().getRegionNameAsString() + " " + store
+ ". flushableSize expected=0, actual= " + flushableSize + ". flushableSize expected=0, actual={" + mss
+ ". Current memStoreSize=" + getMemStoreDataSize() + ". Maybe a coprocessor " + "}. Current memStoreSize=" + this.memStoreSizing.getMemStoreSize() +
". Maybe a coprocessor "
+ "operation failed and left the memstore in a partially updated state.", null); + "operation failed and left the memstore in a partially updated state.", null);
} }
} }
@ -1647,9 +1650,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.closed.set(true); this.closed.set(true);
if (!canFlush) { if (!canFlush) {
this.decrMemStoreSize(new MemStoreSize(memStoreSize)); decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
} else if (memStoreSize.getDataSize() != 0) { } else if (this.memStoreSizing.getDataSize() != 0) {
LOG.error("Memstore data size is " + memStoreSize.getDataSize()); LOG.error("Memstore data size is {}", this.memStoreSizing.getDataSize());
} }
if (coprocessorHost != null) { if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-close hooks"); status.setStatus("Running coprocessor post-close hooks");
@ -1782,7 +1785,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return True if its worth doing a flush before we put up the close flag. * @return True if its worth doing a flush before we put up the close flag.
*/ */
private boolean worthPreFlushing() { private boolean worthPreFlushing() {
return this.memStoreSize.getDataSize() > return this.memStoreSizing.getDataSize() >
this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5); this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
} }
@ -2400,12 +2403,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs
// to no other that it can use to associate with the bulk load. Hence this little dance below // to no other that it can use to associate with the bulk load. Hence this little dance below
// to go get one. // to go get one.
if (this.memStoreSize.getDataSize() <= 0) { if (this.memStoreSizing.getDataSize() <= 0) {
// Take an update lock so no edits can come into memory just yet. // Take an update lock so no edits can come into memory just yet.
this.updatesLock.writeLock().lock(); this.updatesLock.writeLock().lock();
WriteEntry writeEntry = null; WriteEntry writeEntry = null;
try { try {
if (this.memStoreSize.getDataSize() <= 0) { if (this.memStoreSizing.getDataSize() <= 0) {
// Presume that if there are still no edits in the memstore, then there are no edits for // Presume that if there are still no edits in the memstore, then there are no edits for
// this region out in the WAL subsystem so no need to do any trickery clearing out // this region out in the WAL subsystem so no need to do any trickery clearing out
// edits in the WAL sub-system. Up the sequence number so the resulting flush id is for // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
@ -2447,7 +2450,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// block waiting for the lock for internal flush // block waiting for the lock for internal flush
this.updatesLock.writeLock().lock(); this.updatesLock.writeLock().lock();
status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName()); status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
MemStoreSizing totalSizeOfFlushableStores = new MemStoreSizing(); MemStoreSizing totalSizeOfFlushableStores = new NonThreadSafeMemStoreSizing();
Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>(); Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
for (HStore store : storesToFlush) { for (HStore store : storesToFlush) {
@ -2536,14 +2539,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!isAllFamilies(storesToFlush)) { if (!isAllFamilies(storesToFlush)) {
perCfExtras = new StringBuilder(); perCfExtras = new StringBuilder();
for (HStore store: storesToFlush) { for (HStore store: storesToFlush) {
MemStoreSize mss = store.getFlushableSize();
perCfExtras.append("; ").append(store.getColumnFamilyName()); perCfExtras.append("; ").append(store.getColumnFamilyName());
perCfExtras.append("=") perCfExtras.append("={dataSize=")
.append(StringUtils.byteDesc(store.getFlushableSize().getDataSize())); .append(StringUtils.byteDesc(mss.getDataSize()));
perCfExtras.append(", heapSize=")
.append(StringUtils.byteDesc(mss.getHeapSize()));
perCfExtras.append(", offHeapSize=")
.append(StringUtils.byteDesc(mss.getOffHeapSize()));
perCfExtras.append("}");
} }
} }
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," + LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," +
" memstore data size=" + StringUtils.byteDesc(this.memStoreSize.getDataSize()) + " memstore data size=" + StringUtils.byteDesc(mss.getDataSize()) +
" memstore heap size=" + StringUtils.byteDesc(this.memStoreSize.getHeapSize()) + " memstore heap size=" + StringUtils.byteDesc(mss.getHeapSize()) +
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId)); ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
} }
@ -2663,7 +2673,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
storeFlushCtxs.clear(); storeFlushCtxs.clear();
// Set down the memstore size by amount of flush. // Set down the memstore size by amount of flush.
this.decrMemStoreSize(prepareResult.totalFlushableSize); MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
this.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
// Increase the size of this Region for the purposes of quota. Noop if quotas are disabled. // Increase the size of this Region for the purposes of quota. Noop if quotas are disabled.
// During startup, quota manager may not be initialized yet. // During startup, quota manager may not be initialized yet.
@ -2740,12 +2751,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
long time = EnvironmentEdgeManager.currentTime() - startTime; long time = EnvironmentEdgeManager.currentTime() - startTime;
long flushableDataSize = prepareResult.totalFlushableSize.getDataSize(); MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
long flushableHeapSize = prepareResult.totalFlushableSize.getHeapSize(); long memstoresize = this.memStoreSizing.getMemStoreSize().getDataSize();
long memstoresize = this.memStoreSize.getDataSize();
String msg = "Finished memstore flush;" String msg = "Finished memstore flush;"
+ " data size ~" + StringUtils.byteDesc(flushableDataSize) + "/" + flushableDataSize + " data size ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize()
+ ", heap size ~" + StringUtils.byteDesc(flushableHeapSize) + "/" + flushableHeapSize + ", heap size ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize()
+ ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+ " for " + this.getRegionInfo().getEncodedName() + " in " + time + "ms, sequenceid=" + " for " + this.getRegionInfo().getEncodedName() + " in " + time + "ms, sequenceid="
+ flushOpSeqId + ", compaction requested=" + compactionRequested + flushOpSeqId + ", compaction requested=" + compactionRequested
@ -2755,7 +2765,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (rsServices != null && rsServices.getMetrics() != null) { if (rsServices != null && rsServices.getMetrics() != null) {
rsServices.getMetrics().updateFlush(time - startTime, rsServices.getMetrics().updateFlush(time - startTime,
prepareResult.totalFlushableSize.getDataSize(), flushedOutputFileSize); mss.getDataSize(), flushedOutputFileSize);
} }
return new FlushResultImpl(compactionRequested ? return new FlushResultImpl(compactionRequested ?
@ -3067,7 +3077,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected void writeMiniBatchOperationsToMemStore( protected void writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber) final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)
throws IOException { throws IOException {
MemStoreSizing memStoreAccounting = new MemStoreSizing(); MemStoreSizing memStoreAccounting = new NonThreadSafeMemStoreSizing();
visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> { visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
// We need to update the sequence id for following reasons. // We need to update the sequence id for following reasons.
// 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
@ -3080,7 +3090,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return true; return true;
}); });
// update memStore size // update memStore size
region.incMemStoreSize(memStoreAccounting); region.incMemStoreSize(memStoreAccounting.getDataSize(), memStoreAccounting.getHeapSize(),
memStoreAccounting.getOffHeapSize());
} }
public boolean isDone() { public boolean isDone() {
@ -4274,8 +4285,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If catalog region, do not impose resource constraints or block updates. // If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return; if (this.getRegionInfo().isMetaRegion()) return;
if (this.memStoreSize.getHeapSize() MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
+ this.memStoreSize.getOffHeapSize() > this.blockingMemStoreSize) { if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) {
blockedRequestsCount.increment(); blockedRequestsCount.increment();
requestFlush(); requestFlush();
// Don't print current limit because it will vary too much. The message is used as a key // Don't print current limit because it will vary too much. The message is used as a key
@ -4645,7 +4656,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
boolean flush = false; boolean flush = false;
MemStoreSizing memstoreSize = new MemStoreSizing(); MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
for (Cell cell: val.getCells()) { for (Cell cell: val.getCells()) {
// Check this edit is for me. Also, guard against writing the special // Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries // METACOLUMN info such as HBASE::CACHEFLUSH entries
@ -4688,15 +4699,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
PrivateCellUtil.setSequenceId(cell, currentReplaySeqId); PrivateCellUtil.setSequenceId(cell, currentReplaySeqId);
restoreEdit(store, cell, memstoreSize); restoreEdit(store, cell, memStoreSizing);
editsCount++; editsCount++;
} }
MemStoreSize mss = memStoreSizing.getMemStoreSize();
if (this.rsAccounting != null) { if (this.rsAccounting != null) {
rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss);
memstoreSize);
} }
incMemStoreSize(memstoreSize); incMemStoreSize(mss);
flush = isFlushSize(this.memStoreSize); flush = isFlushSize(this.memStoreSizing.getMemStoreSize());
if (flush) { if (flush) {
internalFlushcache(null, currentEditSeqId, stores.values(), status, false, internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
FlushLifeCycleTracker.DUMMY); FlushLifeCycleTracker.DUMMY);
@ -5006,8 +5017,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
replayFlushInStores(flush, prepareFlushResult, true); replayFlushInStores(flush, prepareFlushResult, true);
// Set down the memstore size by amount of flush. // Set down the memstore size by amount of flush.
this.decrMemStoreSize(prepareFlushResult.totalFlushableSize); this.decrMemStoreSize(prepareFlushResult.totalFlushableSize.getMemStoreSize());
this.prepareFlushResult = null; this.prepareFlushResult = null;
writestate.flushing = false; writestate.flushing = false;
} else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) { } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
@ -5039,7 +5049,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
replayFlushInStores(flush, prepareFlushResult, true); replayFlushInStores(flush, prepareFlushResult, true);
// Set down the memstore size by amount of flush. // Set down the memstore size by amount of flush.
this.decrMemStoreSize(prepareFlushResult.totalFlushableSize); this.decrMemStoreSize(prepareFlushResult.totalFlushableSize.getMemStoreSize());
// Inspect the memstore contents to see whether the memstore contains only edits // Inspect the memstore contents to see whether the memstore contains only edits
// with seqId smaller than the flush seqId. If so, we can discard those edits. // with seqId smaller than the flush seqId. If so, we can discard those edits.
@ -5143,7 +5153,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException * @throws IOException
*/ */
private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException { private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException {
MemStoreSizing totalFreedSize = new MemStoreSizing(); MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
this.updatesLock.writeLock().lock(); this.updatesLock.writeLock().lock();
try { try {
@ -5170,7 +5180,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally { } finally {
this.updatesLock.writeLock().unlock(); this.updatesLock.writeLock().unlock();
} }
return totalFreedSize; return totalFreedSize.getMemStoreSize();
} }
private MemStoreSize doDropStoreMemStoreContentsForSeqId(HStore s, long currentSeqId) private MemStoreSize doDropStoreMemStoreContentsForSeqId(HStore s, long currentSeqId)
@ -5293,9 +5303,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ? StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
null : this.prepareFlushResult.storeFlushCtxs.get(family); null : this.prepareFlushResult.storeFlushCtxs.get(family);
if (ctx != null) { if (ctx != null) {
MemStoreSize snapshotSize = store.getFlushableSize(); MemStoreSize mss = store.getFlushableSize();
ctx.abort(); ctx.abort();
this.decrMemStoreSize(snapshotSize); this.decrMemStoreSize(mss);
this.prepareFlushResult.storeFlushCtxs.remove(family); this.prepareFlushResult.storeFlushCtxs.remove(family);
} }
} }
@ -5487,12 +5497,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
null : this.prepareFlushResult.storeFlushCtxs.get( null : this.prepareFlushResult.storeFlushCtxs.get(
store.getColumnFamilyDescriptor().getName()); store.getColumnFamilyDescriptor().getName());
if (ctx != null) { if (ctx != null) {
MemStoreSize snapshotSize = store.getFlushableSize(); MemStoreSize mss = store.getFlushableSize();
ctx.abort(); ctx.abort();
this.decrMemStoreSize(snapshotSize); this.decrMemStoreSize(mss);
this.prepareFlushResult.storeFlushCtxs.remove( this.prepareFlushResult.storeFlushCtxs.
store.getColumnFamilyDescriptor().getName()); remove(store.getColumnFamilyDescriptor().getName());
totalFreedDataSize += snapshotSize.getDataSize(); totalFreedDataSize += mss.getDataSize();
} }
} }
} }
@ -7374,8 +7384,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return null; return null;
} }
ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder(); ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
stats.setMemStoreLoad((int) (Math.min(100, (this.memStoreSize.getHeapSize() * 100) / this stats.setMemStoreLoad((int) (Math.min(100,
.memstoreFlushSize))); (this.memStoreSizing.getMemStoreSize().getHeapSize() * 100) / this.memstoreFlushSize)));
if (rsServices.getHeapMemoryManager() != null) { if (rsServices.getHeapMemoryManager() != null) {
// the HeapMemoryManager uses -0.0 to signal a problem asking the JVM, // the HeapMemoryManager uses -0.0 to signal a problem asking the JVM,
// so we could just do the calculation below and we'll get a 0. // so we could just do the calculation below and we'll get a 0.
@ -7436,7 +7446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This is assigned by mvcc either explicity in the below or in the guts of the WAL append // This is assigned by mvcc either explicity in the below or in the guts of the WAL append
// when it assigns the edit a sequencedid (A.K.A the mvcc write number). // when it assigns the edit a sequencedid (A.K.A the mvcc write number).
WriteEntry writeEntry = null; WriteEntry writeEntry = null;
MemStoreSizing memstoreAccounting = new MemStoreSizing(); MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
try { try {
boolean success = false; boolean success = false;
try { try {
@ -7522,7 +7532,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
if (!mutations.isEmpty()) { if (!mutations.isEmpty()) {
this.incMemStoreSize(memstoreAccounting); this.incMemStoreSize(memstoreAccounting.getMemStoreSize());
requestFlushIfNeeded(); requestFlushIfNeeded();
} }
} }
@ -7626,7 +7636,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(op); startRegionOperation(op);
List<Cell> results = returnResults? new ArrayList<>(mutation.size()): null; List<Cell> results = returnResults? new ArrayList<>(mutation.size()): null;
RowLock rowLock = null; RowLock rowLock = null;
MemStoreSizing memstoreAccounting = new MemStoreSizing(); MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
try { try {
rowLock = getRowLockInternal(mutation.getRow(), false, null); rowLock = getRowLockInternal(mutation.getRow(), false, null);
lock(this.updatesLock.readLock()); lock(this.updatesLock.readLock());
@ -7676,7 +7686,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowLock.release(); rowLock.release();
} }
// Request a cache flush if over the limit. Do it outside update lock. // Request a cache flush if over the limit. Do it outside update lock.
incMemStoreSize(memstoreAccounting); incMemStoreSize(memstoreAccounting.getMemStoreSize());
requestFlushIfNeeded(); requestFlushIfNeeded();
closeRegionOperation(op); closeRegionOperation(op);
if (this.metricsRegion != null) { if (this.metricsRegion != null) {
@ -8557,7 +8567,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
private void requestFlushIfNeeded() throws RegionTooBusyException { private void requestFlushIfNeeded() throws RegionTooBusyException {
if(isFlushSize(memStoreSize)) { if(isFlushSize(this.memStoreSizing.getMemStoreSize())) {
requestFlush(); requestFlush();
} }
} }

View File

@ -2272,7 +2272,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
this.cacheFlushCount = snapshot.getCellsCount(); this.cacheFlushCount = snapshot.getCellsCount();
this.cacheFlushSize = snapshot.getDataSize(); this.cacheFlushSize = snapshot.getDataSize();
committedFiles = new ArrayList<>(1); committedFiles = new ArrayList<>(1);
return new MemStoreSize(snapshot.getMemStoreSize()); return snapshot.getMemStoreSize();
} }
@Override @Override

View File

@ -50,12 +50,11 @@ public interface MemStore {
void clearSnapshot(long id) throws UnexpectedStateException; void clearSnapshot(long id) throws UnexpectedStateException;
/** /**
* On flush, how much memory we will clear.
* Flush will first clear out the data in snapshot if any (It will take a second flush * Flush will first clear out the data in snapshot if any (It will take a second flush
* invocation to clear the current Cell set). If snapshot is empty, current * invocation to clear the current Cell set). If snapshot is empty, current
* Cell set will be flushed. * Cell set will be flushed.
* *
* @return size of data that is going to be flushed * @return On flush, how much memory we will clear.
*/ */
MemStoreSize getFlushableSize(); MemStoreSize getFlushableSize();

View File

@ -555,8 +555,9 @@ class MemStoreFlusher implements FlushRequester {
// If this is first time we've been put off, then emit a log message. // If this is first time we've been put off, then emit a log message.
if (fqe.getRequeueCount() <= 0) { if (fqe.getRequeueCount() <= 0) {
// Note: We don't impose blockingStoreFiles constraint on meta regions // Note: We don't impose blockingStoreFiles constraint on meta regions
LOG.warn("Region " + region.getRegionInfo().getEncodedName() + " has too many " + LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
"store files; delaying flush up to " + this.blockingWaitTime + "ms"); region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
this.blockingWaitTime);
if (!this.server.compactSplitThread.requestSplit(region)) { if (!this.server.compactSplitThread.requestSplit(region)) {
try { try {
this.server.compactSplitThread.requestSystemCompaction(region, this.server.compactSplitThread.requestSystemCompaction(region,
@ -677,6 +678,14 @@ class MemStoreFlusher implements FlushRequester {
return false; return false;
} }
private int getStoreFileCount(Region region) {
int count = 0;
for (Store store : region.getStores()) {
count += store.getStorefilesCount();
}
return count;
}
/** /**
* Check if the regionserver's memstore memory usage is greater than the * Check if the regionserver's memstore memory usage is greater than the
* limit. If so, flush regions with the biggest memstores until we're down * limit. If so, flush regions with the biggest memstores until we're down
@ -760,10 +769,10 @@ class MemStoreFlusher implements FlushRequester {
} }
} }
private void logMsg(String string1, long val, long max) { private void logMsg(String type, long val, long max) {
LOG.info("Blocking updates on " + server.toString() + ": " + string1 + " " LOG.info("Blocking updates: {} {} is >= blocking {}", type,
+ TraditionalBinaryPrefix.long2String(val, "", 1) + " is >= than blocking " TraditionalBinaryPrefix.long2String(val, "", 1),
+ TraditionalBinaryPrefix.long2String(max, "", 1) + " size"); TraditionalBinaryPrefix.long2String(max, "", 1));
} }
@Override @Override

View File

@ -19,64 +19,56 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Reports the data size part and total heap space occupied by the MemStore. * Data structure of three longs.
* Read-only. * Convenient package in which to carry current state of three counters.
* <p>Immutable!</p>
* @see MemStoreSizing * @see MemStoreSizing
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public class MemStoreSize { public class MemStoreSize {
// MemStore size tracks 3 sizes:
// (1) data size: the aggregated size of all key-value not including meta data such as
// index, time range etc.
// (2) heap size: the aggregated size of all data that is allocated on-heap including all
// key-values that reside on-heap and the metadata that resides on-heap
// (3) off-heap size: the aggregated size of all data that is allocated off-heap including all
// key-values that reside off-heap and the metadata that resides off-heap
//
// 3 examples to illustrate their usage:
// Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated
// on-heap. The counters are <100MB, 120MB, 0>, respectively.
// Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata
// allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively.
// Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB
// are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated
// off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active).
// The counters are <100MB, 10MB, 110MB>, respectively.
/** /**
*'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can *'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can
* be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap * be in on heap or off heap area depending on the MSLAB and its configuration to be using on
* or off heap LABs * heap or off heap LABs
*/ */
protected volatile long dataSize; private final long dataSize;
/** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead. /**'getHeapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
* When Cells in on heap area, this will include the cells data size as well. * When Cells in on heap area, this will include the cells data size as well.
*/ */
protected volatile long heapSize; private final long heapSize;
/** off-heap size: the aggregated size of all data that is allocated off-heap including all /** off-heap size: the aggregated size of all data that is allocated off-heap including all
* key-values that reside off-heap and the metadata that resides off-heap * key-values that reside off-heap and the metadata that resides off-heap
*/ */
protected volatile long offHeapSize; private final long offHeapSize;
public MemStoreSize() { /**
* Package private constructor.
*/
MemStoreSize() {
this(0L, 0L, 0L); this(0L, 0L, 0L);
} }
public MemStoreSize(long dataSize, long heapSize, long offHeapSize) { /**
* Package private constructor.
*/
MemStoreSize(long dataSize, long heapSize, long offHeapSize) {
this.dataSize = dataSize; this.dataSize = dataSize;
this.heapSize = heapSize; this.heapSize = heapSize;
this.offHeapSize = offHeapSize; this.offHeapSize = offHeapSize;
} }
protected MemStoreSize(MemStoreSize memStoreSize) { /**
this.dataSize = memStoreSize.dataSize; * Package private constructor.
this.heapSize = memStoreSize.heapSize; */
this.offHeapSize = memStoreSize.offHeapSize; MemStoreSize(MemStoreSize memStoreSize) {
this.dataSize = memStoreSize.getDataSize();
this.heapSize = memStoreSize.getHeapSize();
this.offHeapSize = memStoreSize.getOffHeapSize();
} }
public boolean isEmpty() { public boolean isEmpty() {
return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0; return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0;
} }
@ -101,24 +93,22 @@ public class MemStoreSize {
if (!(obj instanceof MemStoreSize)) { if (!(obj instanceof MemStoreSize)) {
return false; return false;
} }
MemStoreSize other = (MemStoreSize) obj; MemStoreSize other = (MemStoreSize)obj;
return this.dataSize == other.dataSize return this.dataSize == other.dataSize && this.heapSize == other.heapSize &&
&& this.heapSize == other.heapSize this.offHeapSize == other.offHeapSize;
&& this.offHeapSize == other.offHeapSize;
} }
@Override @Override
public int hashCode() { public int hashCode() {
long h = 13 * this.dataSize; long h = 31 * this.dataSize;
h = h + 14 * this.heapSize; h = h + 31 * this.heapSize;
h = h + 15 * this.offHeapSize; h = h + 31 * this.offHeapSize;
return (int) h; return (int) h;
} }
@Override @Override
public String toString() { public String toString() {
return "dataSize=" + this.dataSize return "dataSize=" + this.dataSize + ", getHeapSize=" + this.heapSize +
+ " , heapSize=" + this.heapSize ", getOffHeapSize=" + this.offHeapSize;
+ " , offHeapSize=" + this.offHeapSize;
} }
} }

View File

@ -21,61 +21,96 @@ import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Accounting of current heap and data sizes. * Accounting of current heap and data sizes.
* Allows read/write on data/heap size as opposed to {@Link MemStoreSize} which is read-only. * Tracks 3 sizes:
* For internal use. * <ol>
* @see MemStoreSize * <li></li>data size: the aggregated size of all key-value not including meta data such as
* index, time range etc.
* </li>
* <li>heap size: the aggregated size of all data that is allocated on-heap including all
* key-values that reside on-heap and the metadata that resides on-heap
* </li>
* <li></li>off-heap size: the aggregated size of all data that is allocated off-heap including all
* key-values that reside off-heap and the metadata that resides off-heap
* </li>
* </ol>
*
* 3 examples to illustrate their usage:
* <p>
* Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated
* on-heap. The counters are <100MB, 120MB, 0>, respectively.
* </p>
* <p>Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata
* allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively.
* </p>
* <p>
* Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB
* are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated
* off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active).
* The counters are <100MB, 10MB, 110MB>, respectively.
* </p>
*
* Like {@link TimeRangeTracker}, it has thread-safe and non-thread-safe implementations.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class MemStoreSizing extends MemStoreSize { public interface MemStoreSizing {
public static final MemStoreSizing DUD = new MemStoreSizing() { static final MemStoreSizing DUD = new MemStoreSizing() {
private final MemStoreSize mss = new MemStoreSize();
@Override public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, @Override
long offHeapSizeDelta) { public MemStoreSize getMemStoreSize() {
throw new RuntimeException("I'm a dud, you can't use me!"); return this.mss;
} }
@Override public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, @Override
public long getDataSize() {
return this.mss.getDataSize();
}
@Override
public long getHeapSize() {
return this.mss.getHeapSize();
}
@Override
public long getOffHeapSize() {
return this.mss.getOffHeapSize();
}
@Override
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
long offHeapSizeDelta) { long offHeapSizeDelta) {
throw new RuntimeException("I'm a dud, you can't use me!"); throw new RuntimeException("I'm a DUD, you can't use me!");
} }
}; };
public MemStoreSizing() { /**
super(); * @return The new dataSize ONLY as a convenience
*/
long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta);
default long incMemStoreSize(MemStoreSize delta) {
return incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
} }
public MemStoreSizing(long dataSize, long heapSize, long offHeapSize) { /**
super(dataSize, heapSize, offHeapSize); * @return The new dataSize ONLY as a convenience
*/
default long decMemStoreSize(long dataSizeDelta, long heapSizeDelta,
long offHeapSizeDelta) {
return incMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
} }
public MemStoreSizing(MemStoreSize memStoreSize) { default long decMemStoreSize(MemStoreSize delta) {
super(memStoreSize); return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), -delta.getOffHeapSize());
} }
public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { long getDataSize();
this.dataSize += dataSizeDelta; long getHeapSize();
this.heapSize += heapSizeDelta; long getOffHeapSize();
this.offHeapSize += offHeapSizeDelta;
}
public void incMemStoreSize(MemStoreSize delta) {
incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
}
public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
this.dataSize -= dataSizeDelta;
this.heapSize -= heapSizeDelta;
this.offHeapSize -= offHeapSizeDelta;
}
public void decMemStoreSize(MemStoreSize delta) {
decMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
}
public void empty() {
this.dataSize = 0L;
this.heapSize = 0L;
this.offHeapSize = 0L;
}
/**
* @return Use this datastructure to return all three settings, {@link #getDataSize()},
* {@link #getHeapSize()}, and {@link #getOffHeapSize()}, in the one go.
*/
MemStoreSize getMemStoreSize();
} }

View File

@ -44,7 +44,7 @@ public class MutableSegment extends Segment {
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC)); super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
incSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata incMemStoreSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata
} }
/** /**
@ -89,7 +89,7 @@ public class MutableSegment extends Segment {
int cellLen = getCellLength(cur); int cellLen = getCellLength(cur);
long heapSize = heapSizeChange(cur, true); long heapSize = heapSizeChange(cur, true);
long offHeapSize = offHeapSizeChange(cur, true); long offHeapSize = offHeapSizeChange(cur, true);
this.incSize(-cellLen, -heapSize, -offHeapSize); incMemStoreSize(-cellLen, -heapSize, -offHeapSize);
if (memStoreSizing != null) { if (memStoreSizing != null) {
memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize); memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize);
} }

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Accounting of current heap and data sizes.
* <em>NOT THREAD SAFE</em>.
* Use in a 'local' context only where just a single-thread is updating. No concurrency!
* Used, for example, when summing all Cells in a single batch where result is then applied to the
* Store.
* @see ThreadSafeMemStoreSizing
*/
@InterfaceAudience.Private
class NonThreadSafeMemStoreSizing implements MemStoreSizing {
private long dataSize = 0;
private long heapSize = 0;
private long offHeapSize = 0;
NonThreadSafeMemStoreSizing() {
this(0, 0, 0);
}
NonThreadSafeMemStoreSizing(MemStoreSize mss) {
this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
}
NonThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
incMemStoreSize(dataSize, heapSize, offHeapSize);
}
@Override
public MemStoreSize getMemStoreSize() {
return new MemStoreSize(this.dataSize, this.heapSize, this.offHeapSize);
}
@Override
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
long offHeapSizeDelta) {
this.offHeapSize += offHeapSizeDelta;
this.heapSize += heapSizeDelta;
this.dataSize += dataSizeDelta;
return this.dataSize;
}
@Override
public long getDataSize() {
return dataSize;
}
@Override
public long getHeapSize() {
return heapSize;
}
@Override
public long getOffHeapSize() {
return offHeapSize;
}
@Override
public String toString() {
return getMemStoreSize().toString();
}
}

View File

@ -131,20 +131,20 @@ public class RegionServerAccounting {
return this.globalMemStoreOffHeapSize.sum(); return this.globalMemStoreOffHeapSize.sum();
} }
/** void incGlobalMemStoreSize(MemStoreSize mss) {
* @param memStoreSize the Memstore size will be added to incGlobalMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
* the global Memstore size
*/
public void incGlobalMemStoreSize(MemStoreSize memStoreSize) {
globalMemStoreDataSize.add(memStoreSize.getDataSize());
globalMemStoreHeapSize.add(memStoreSize.getHeapSize());
globalMemStoreOffHeapSize.add(memStoreSize.getOffHeapSize());
} }
public void decGlobalMemStoreSize(MemStoreSize memStoreSize) { public void incGlobalMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
globalMemStoreDataSize.add(-memStoreSize.getDataSize()); globalMemStoreDataSize.add(dataSizeDelta);
globalMemStoreHeapSize.add(-memStoreSize.getHeapSize()); globalMemStoreHeapSize.add(heapSizeDelta);
globalMemStoreOffHeapSize.add(-memStoreSize.getOffHeapSize()); globalMemStoreOffHeapSize.add(offHeapSizeDelta);
}
public void decGlobalMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
globalMemStoreDataSize.add(-dataSizeDelta);
globalMemStoreHeapSize.add(-heapSizeDelta);
globalMemStoreOffHeapSize.add(-offHeapSizeDelta);
} }
/** /**
@ -231,7 +231,7 @@ public class RegionServerAccounting {
// the region open operation. No need to handle multi thread issues on one region's entry in // the region open operation. No need to handle multi thread issues on one region's entry in
// this Map. // this Map.
if (replayEdistsSize == null) { if (replayEdistsSize == null) {
replayEdistsSize = new MemStoreSizing(); replayEdistsSize = new ThreadSafeMemStoreSizing();
replayEditsPerRegion.put(regionName, replayEdistsSize); replayEditsPerRegion.put(regionName, replayEdistsSize);
} }
replayEdistsSize.incMemStoreSize(memStoreSize); replayEdistsSize.incMemStoreSize(memStoreSize);
@ -244,10 +244,11 @@ public class RegionServerAccounting {
* @param regionName the region which could not open. * @param regionName the region which could not open.
*/ */
public void rollbackRegionReplayEditsSize(byte[] regionName) { public void rollbackRegionReplayEditsSize(byte[] regionName) {
MemStoreSize replayEditsSize = replayEditsPerRegion.get(regionName); MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName);
if (replayEditsSize != null) { if (replayEditsSizing != null) {
clearRegionReplayEditsSize(regionName); clearRegionReplayEditsSize(regionName);
decGlobalMemStoreSize(replayEditsSize); decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(),
replayEditsSizing.getOffHeapSize());
} }
} }

View File

@ -65,8 +65,8 @@ public class RegionServicesForStores {
region.unblockUpdates(); region.unblockUpdates();
} }
public void addMemStoreSize(MemStoreSize size) { public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
region.incMemStoreSize(size); region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
} }
public RegionInfo getRegionInfo() { public RegionInfo getRegionInfo() {

View File

@ -553,9 +553,6 @@ public class ScannerContext {
/** /**
* Set all fields together. * Set all fields together.
* @param batch
* @param sizeScope
* @param dataSize
*/ */
void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize, void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize,
LimitScope timeScope, long time) { LimitScope timeScope, long time) {

View File

@ -45,7 +45,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* segments from active set to snapshot set in the default implementation. * segments from active set to snapshot set in the default implementation.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class Segment { public abstract class Segment implements MemStoreSizing {
public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, memStoreSizing, + 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, memStoreSizing,
@ -59,9 +59,9 @@ public abstract class Segment {
private final CellComparator comparator; private final CellComparator comparator;
protected long minSequenceId; protected long minSequenceId;
private MemStoreLAB memStoreLAB; private MemStoreLAB memStoreLAB;
// Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not // Sum of sizes of all Cells added to this Segment. Cell's HeapSize is considered. This is not
// including the heap overhead of this class. // including the heap overhead of this class.
protected final MemStoreSizing segmentSize; protected final MemStoreSizing memStoreSizing;
protected final TimeRangeTracker timeRangeTracker; protected final TimeRangeTracker timeRangeTracker;
protected volatile boolean tagsPresent; protected volatile boolean tagsPresent;
@ -69,7 +69,9 @@ public abstract class Segment {
// and there is no need in true Segments state // and there is no need in true Segments state
protected Segment(CellComparator comparator, TimeRangeTracker trt) { protected Segment(CellComparator comparator, TimeRangeTracker trt) {
this.comparator = comparator; this.comparator = comparator;
this.segmentSize = new MemStoreSizing(); // Do we need to be thread safe always? What if ImmutableSegment?
// DITTO for the TimeRangeTracker below.
this.memStoreSizing = new ThreadSafeMemStoreSizing();
this.timeRangeTracker = trt; this.timeRangeTracker = trt;
} }
@ -85,7 +87,9 @@ public abstract class Segment {
OffHeapSize += memStoreSize.getOffHeapSize(); OffHeapSize += memStoreSize.getOffHeapSize();
} }
this.comparator = comparator; this.comparator = comparator;
this.segmentSize = new MemStoreSizing(dataSize, heapSize, OffHeapSize); // Do we need to be thread safe always? What if ImmutableSegment?
// DITTO for the TimeRangeTracker below.
this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize);
this.timeRangeTracker = trt; this.timeRangeTracker = trt;
} }
@ -95,7 +99,9 @@ public abstract class Segment {
this.comparator = comparator; this.comparator = comparator;
this.minSequenceId = Long.MAX_VALUE; this.minSequenceId = Long.MAX_VALUE;
this.memStoreLAB = memStoreLAB; this.memStoreLAB = memStoreLAB;
this.segmentSize = new MemStoreSizing(); // Do we need to be thread safe always? What if ImmutableSegment?
// DITTO for the TimeRangeTracker below.
this.memStoreSizing = new ThreadSafeMemStoreSizing();
this.tagsPresent = false; this.tagsPresent = false;
this.timeRangeTracker = trt; this.timeRangeTracker = trt;
} }
@ -105,7 +111,7 @@ public abstract class Segment {
this.comparator = segment.getComparator(); this.comparator = segment.getComparator();
this.minSequenceId = segment.getMinSequenceId(); this.minSequenceId = segment.getMinSequenceId();
this.memStoreLAB = segment.getMemStoreLAB(); this.memStoreLAB = segment.getMemStoreLAB();
this.segmentSize = new MemStoreSizing(segment.getMemStoreSize()); this.memStoreSizing = new ThreadSafeMemStoreSizing(segment.memStoreSizing.getMemStoreSize());
this.tagsPresent = segment.isTagsPresent(); this.tagsPresent = segment.isTagsPresent();
this.timeRangeTracker = segment.getTimeRangeTracker(); this.timeRangeTracker = segment.getTimeRangeTracker();
} }
@ -213,39 +219,29 @@ public abstract class Segment {
return this; return this;
} }
@Override
public MemStoreSize getMemStoreSize() { public MemStoreSize getMemStoreSize() {
return this.segmentSize; return this.memStoreSizing.getMemStoreSize();
} }
/** @Override
* @return Sum of all cell's size. public long getDataSize() {
*/ return this.memStoreSizing.getDataSize();
public long keySize() {
return this.segmentSize.getDataSize();
} }
/** @Override
* @return The heap size of this segment. public long getHeapSize() {
*/ return this.memStoreSizing.getHeapSize();
public long heapSize() {
return this.segmentSize.getHeapSize();
} }
/** @Override
* @return The off-heap size of this segment. public long getOffHeapSize() {
*/ return this.memStoreSizing.getOffHeapSize();
public long offHeapSize() {
return this.segmentSize.getOffHeapSize();
} }
/** @Override
* Updates the size counters of the segment by the given delta public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) {
*/ return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
//TODO
protected void incSize(long delta, long heapOverhead, long offHeapOverhead) {
synchronized (this) {
this.segmentSize.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
}
} }
public long getMinSequenceId() { public long getMinSequenceId() {
@ -308,7 +304,7 @@ public abstract class Segment {
} }
long heapSize = heapSizeChange(cellToAdd, succ); long heapSize = heapSizeChange(cellToAdd, succ);
long offHeapSize = offHeapSizeChange(cellToAdd, succ); long offHeapSize = offHeapSizeChange(cellToAdd, succ);
incSize(cellSize, heapSize, offHeapSize); incMemStoreSize(cellSize, heapSize, offHeapSize);
if (memstoreSizing != null) { if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize); memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize);
} }
@ -408,8 +404,8 @@ public abstract class Segment {
String res = "type=" + this.getClass().getSimpleName() + ", "; String res = "type=" + this.getClass().getSimpleName() + ", ";
res += "empty=" + (isEmpty()? "yes": "no") + ", "; res += "empty=" + (isEmpty()? "yes": "no") + ", ";
res += "cellCount=" + getCellsCount() + ", "; res += "cellCount=" + getCellsCount() + ", ";
res += "cellSize=" + keySize() + ", "; res += "cellSize=" + getDataSize() + ", ";
res += "totalHeapSize=" + heapSize() + ", "; res += "totalHeapSize=" + getHeapSize() + ", ";
res += "min timestamp=" + timeRangeTracker.getMin() + ", "; res += "min timestamp=" + timeRangeTracker.getMin() + ", ";
res += "max timestamp=" + timeRangeTracker.getMax(); res += "max timestamp=" + timeRangeTracker.getMax();
return res; return res;

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Accounting of current heap and data sizes.
* Thread-safe. Many threads can do updates against this single instance.
* @see NonThreadSafeMemStoreSizing
* @see MemStoreSize
*/
@InterfaceAudience.Private
class ThreadSafeMemStoreSizing implements MemStoreSizing {
// We used to tie the update of these thread counters so
// they all changed together under one lock. This was
// undone. Doesn't seem necessary.
private final AtomicLong dataSize = new AtomicLong();
private final AtomicLong heapSize = new AtomicLong();
private final AtomicLong offHeapSize = new AtomicLong();
ThreadSafeMemStoreSizing() {
this(0, 0, 0);
}
ThreadSafeMemStoreSizing(MemStoreSize mss) {
this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
}
ThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
incMemStoreSize(dataSize, heapSize, offHeapSize);
}
public MemStoreSize getMemStoreSize() {
return new MemStoreSize(getDataSize(), getHeapSize(), getOffHeapSize());
}
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
this.offHeapSize.addAndGet(offHeapSizeDelta);
this.heapSize.addAndGet(heapSizeDelta);
return this.dataSize.addAndGet(dataSizeDelta);
}
@Override
public long getDataSize() {
return dataSize.get();
}
@Override
public long getHeapSize() {
return heapSize.get();
}
@Override
public long getOffHeapSize() {
return offHeapSize.get();
}
@Override
public String toString() {
return getMemStoreSize().toString();
}
}

View File

@ -355,7 +355,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
private long runSnapshot(final AbstractMemStore hmc, boolean useForce) private long runSnapshot(final AbstractMemStore hmc, boolean useForce)
throws IOException { throws IOException {
// Save off old state. // Save off old state.
long oldHistorySize = hmc.getSnapshot().keySize(); long oldHistorySize = hmc.getSnapshot().getDataSize();
long prevTimeStamp = hmc.timeOfOldestEdit(); long prevTimeStamp = hmc.timeOfOldestEdit();
hmc.snapshot(); hmc.snapshot();
@ -616,9 +616,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher // simulate flusher
region.decrMemStoreSize(mss);
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount()); assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -667,7 +668,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
@ -675,9 +676,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
+ 7 * oneCellOnCAHeapSize; + 7 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
size = memstore.getFlushableSize(); mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher // simulate flusher
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(7, s.getCellsCount()); assertEquals(7, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -722,7 +724,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
((MyCompactingMemStore) memstore).disableCompaction(); ((MyCompactingMemStore) memstore).disableCompaction();
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
// No change in the cells data size. ie. memstore size. as there is no compaction. // No change in the cells data size. ie. memstore size. as there is no compaction.
@ -738,7 +740,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
((MyCompactingMemStore)memstore).enableCompaction(); ((MyCompactingMemStore)memstore).enableCompaction();
size = memstore.getFlushableSize(); mss = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
// active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
@ -751,9 +753,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
+ CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize(); mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher // simulate flusher
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount()); assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -811,9 +814,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertTrue(4 == numCells || 11 == numCells); assertTrue(4 == numCells || 11 == numCells);
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher // simulate flusher
region.decrMemStoreSize(mss);
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
numCells = s.getCellsCount(); numCells = s.getCellsCount();
assertTrue(4 == numCells || 11 == numCells); assertTrue(4 == numCells || 11 == numCells);
@ -825,8 +829,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier"); byte[] qf = Bytes.toBytes("testqualifier");
long size = hmc.getActive().keySize(); long size = hmc.getActive().getDataSize();
long heapOverhead = hmc.getActive().heapSize(); long heapOverhead = hmc.getActive().getHeapSize();
int totalLen = 0; int totalLen = 0;
for (int i = 0; i < keys.length; i++) { for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
@ -838,8 +842,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
hmc.add(kv, null); hmc.add(kv, null);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
} }
regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size, regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
hmc.getActive().heapSize() - heapOverhead, 0)); hmc.getActive().getHeapSize() - heapOverhead, 0);
return totalLen; return totalLen;
} }
@ -847,8 +851,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier"); byte[] qf = Bytes.toBytes("testqualifier");
long size = hmc.getActive().keySize(); long size = hmc.getActive().getDataSize();
long heapOverhead = hmc.getActive().heapSize(); long heapOverhead = hmc.getActive().getHeapSize();
int totalLen = 0; int totalLen = 0;
for (int i = 0; i < keys.length; i++) { for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
@ -859,8 +863,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
hmc.add(kv, null); hmc.add(kv, null);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
} }
regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size, regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
hmc.getActive().heapSize() - heapOverhead, 0)); hmc.getActive().getHeapSize() - heapOverhead, 0);
return totalLen; return totalLen;
} }

View File

@ -132,9 +132,9 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
counter += s.getCellsCount(); counter += s.getCellsCount();
} }
assertEquals(3, counter); assertEquals(3, counter);
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher region.decrMemStoreSize(mss); // simulate flusher
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(3, s.getCellsCount()); assertEquals(3, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -194,9 +194,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
totalHeapSize2 = 1 * cellAfterFlushSize; totalHeapSize2 = 1 * cellAfterFlushSize;
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher // simulate flusher
region.decrMemStoreSize(mss);
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount()); assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -224,7 +225,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalCellsLen1, region.getMemStoreDataSize()); assertEquals(totalCellsLen1, region.getMemStoreDataSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
@ -245,7 +246,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
((MyCompactingMemStore) memstore).disableCompaction(); ((MyCompactingMemStore) memstore).disableCompaction();
size = memstore.getFlushableSize(); mss = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
@ -260,7 +261,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
((CompactingMemStore) memstore).heapSize()); ((CompactingMemStore) memstore).heapSize());
((MyCompactingMemStore) memstore).enableCompaction(); ((MyCompactingMemStore) memstore).enableCompaction();
size = memstore.getFlushableSize(); mss = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10); Threads.sleep(10);
@ -279,9 +280,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize(); mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher // simulate flusher
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount()); assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -653,9 +655,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher // simulate flusher
region.decrMemStoreSize(mss);
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount()); assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -725,9 +728,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher // simulate flusher
region.decrMemStoreSize(mss);
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount()); assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -799,9 +803,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher // simulate flusher
region.decrMemStoreSize(mss);
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount()); assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -893,7 +898,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) { private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier"); byte[] qf = Bytes.toBytes("testqualifier");
MemStoreSizing memstoreSizing = new MemStoreSizing(); MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
for (int i = 0; i < keys.length; i++) { for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts Threads.sleep(1); // to make sure each kv gets a different ts
@ -903,8 +908,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
hmc.add(kv, memstoreSizing); hmc.add(kv, memstoreSizing);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
} }
regionServicesForStores.addMemStoreSize(memstoreSizing); MemStoreSize mss = memstoreSizing.getMemStoreSize();
return memstoreSizing.getDataSize(); regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(),
mss.getOffHeapSize());
return mss.getDataSize();
} }
private long cellBeforeFlushSize() { private long cellBeforeFlushSize() {

View File

@ -129,17 +129,18 @@ public class TestDefaultMemStore {
public void testPutSameCell() { public void testPutSameCell() {
byte[] bytes = Bytes.toBytes(getName()); byte[] bytes = Bytes.toBytes(getName());
KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes); KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
MemStoreSizing sizeChangeForFirstCell = new MemStoreSizing(); MemStoreSizing sizeChangeForFirstCell = new NonThreadSafeMemStoreSizing();
this.memstore.add(kv, sizeChangeForFirstCell); this.memstore.add(kv, sizeChangeForFirstCell);
MemStoreSizing sizeChangeForSecondCell = new MemStoreSizing(); MemStoreSizing sizeChangeForSecondCell = new NonThreadSafeMemStoreSizing();
this.memstore.add(kv, sizeChangeForSecondCell); this.memstore.add(kv, sizeChangeForSecondCell);
// make sure memstore size increase won't double-count MSLAB chunk size // make sure memstore size increase won't double-count MSLAB chunk size
assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getDataSize()); assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getMemStoreSize().getDataSize());
Segment segment = this.memstore.getActive(); Segment segment = this.memstore.getActive();
MemStoreLAB msLab = segment.getMemStoreLAB(); MemStoreLAB msLab = segment.getMemStoreLAB();
if (msLab != null) { if (msLab != null) {
// make sure memstore size increased even when writing the same cell, if using MSLAB // make sure memstore size increased even when writing the same cell, if using MSLAB
assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize()); assertEquals(Segment.getCellLength(kv),
sizeChangeForSecondCell.getMemStoreSize().getDataSize());
// make sure chunk size increased even when writing the same cell, if using MSLAB // make sure chunk size increased even when writing the same cell, if using MSLAB
if (msLab instanceof MemStoreLABImpl) { if (msLab instanceof MemStoreLABImpl) {
// since we add the chunkID at the 0th offset of the chunk and the // since we add the chunkID at the 0th offset of the chunk and the
@ -149,8 +150,8 @@ public class TestDefaultMemStore {
} }
} else { } else {
// make sure no memstore size change w/o MSLAB // make sure no memstore size change w/o MSLAB
assertEquals(0, sizeChangeForSecondCell.getDataSize()); assertEquals(0, sizeChangeForSecondCell.getMemStoreSize().getDataSize());
assertEquals(0, sizeChangeForSecondCell.getHeapSize()); assertEquals(0, sizeChangeForSecondCell.getMemStoreSize().getHeapSize());
} }
} }

View File

@ -361,8 +361,9 @@ public class TestHRegion {
} finally { } finally {
assertTrue("The regionserver should have thrown an exception", threwIOE); assertTrue("The regionserver should have thrown an exception", threwIOE);
} }
long sz = store.getFlushableSize().getDataSize(); MemStoreSize mss = store.getFlushableSize();
assertTrue("flushable size should be zero, but it is " + sz, sz == 0); assertTrue("flushable size should be zero, but it is " + mss,
mss.getDataSize() == 0);
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
@ -414,9 +415,10 @@ public class TestHRegion {
} catch (IOException expected) { } catch (IOException expected) {
} }
long expectedSize = onePutSize * 2; long expectedSize = onePutSize * 2;
assertEquals("memstoreSize should be incremented", expectedSize, region.getMemStoreDataSize()); assertEquals("memstoreSize should be incremented",
assertEquals("flushable size should be incremented", expectedSize, expectedSize, region.getMemStoreDataSize());
store.getFlushableSize().getDataSize()); assertEquals("flushable size should be incremented",
expectedSize, store.getFlushableSize().getDataSize());
region.setCoprocessorHost(null); region.setCoprocessorHost(null);
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);

View File

@ -367,7 +367,7 @@ public class TestHRegionReplayEvents {
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
long storeFlushableSize = store.getFlushableSize().getHeapSize(); MemStoreSize mss = store.getFlushableSize();
long storeSize = store.getSize(); long storeSize = store.getSize();
long storeSizeUncompressed = store.getStoreSizeUncompressed(); long storeSizeUncompressed = store.getStoreSizeUncompressed();
if (flushDesc.getAction() == FlushAction.START_FLUSH) { if (flushDesc.getAction() == FlushAction.START_FLUSH) {
@ -391,8 +391,8 @@ public class TestHRegionReplayEvents {
for (HStore s : secondaryRegion.getStores()) { for (HStore s : secondaryRegion.getStores()) {
assertEquals(expectedStoreFileCount, s.getStorefilesCount()); assertEquals(expectedStoreFileCount, s.getStorefilesCount());
} }
long newFlushableSize = store.getFlushableSize().getHeapSize(); MemStoreSize newMss = store.getFlushableSize();
assertTrue(storeFlushableSize > newFlushableSize); assertTrue(mss.getHeapSize() > newMss.getHeapSize());
// assert that the region memstore is smaller now // assert that the region memstore is smaller now
long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
@ -466,7 +466,7 @@ public class TestHRegionReplayEvents {
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
long storeFlushableSize = store.getFlushableSize().getHeapSize(); MemStoreSize mss = store.getFlushableSize();
if (flushDesc.getAction() == FlushAction.START_FLUSH) { if (flushDesc.getAction() == FlushAction.START_FLUSH) {
startFlushDesc = flushDesc; startFlushDesc = flushDesc;
@ -475,7 +475,7 @@ public class TestHRegionReplayEvents {
assertNull(result.result); assertNull(result.result);
assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
assertTrue(regionMemstoreSize > 0); assertTrue(regionMemstoreSize > 0);
assertTrue(storeFlushableSize > 0); assertTrue(mss.getHeapSize() > 0);
// assert that the store memstore is smaller now // assert that the store memstore is smaller now
long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
@ -616,8 +616,8 @@ public class TestHRegionReplayEvents {
assertEquals(expectedStoreFileCount, s.getStorefilesCount()); assertEquals(expectedStoreFileCount, s.getStorefilesCount());
} }
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long newFlushableSize = store.getFlushableSize().getHeapSize(); MemStoreSize mss = store.getFlushableSize();
assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
// assert that the region memstore is same as before // assert that the region memstore is same as before
long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
@ -706,8 +706,8 @@ public class TestHRegionReplayEvents {
assertEquals(expectedStoreFileCount, s.getStorefilesCount()); assertEquals(expectedStoreFileCount, s.getStorefilesCount());
} }
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long newFlushableSize = store.getFlushableSize().getHeapSize(); MemStoreSize mss = store.getFlushableSize();
assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
// assert that the region memstore is smaller than before, but not empty // assert that the region memstore is smaller than before, but not empty
long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
@ -811,12 +811,12 @@ public class TestHRegionReplayEvents {
assertEquals(expectedStoreFileCount, s.getStorefilesCount()); assertEquals(expectedStoreFileCount, s.getStorefilesCount());
} }
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long newFlushableSize = store.getFlushableSize().getHeapSize(); MemStoreSize mss = store.getFlushableSize();
if (droppableMemstore) { if (droppableMemstore) {
// assert that the memstore is dropped // assert that the memstore is dropped
assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD); assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
} else { } else {
assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
} }
// assert that the region memstore is same as before (we could not drop) // assert that the region memstore is same as before (we could not drop)
@ -903,8 +903,8 @@ public class TestHRegionReplayEvents {
assertEquals(expectedStoreFileCount, s.getStorefilesCount()); assertEquals(expectedStoreFileCount, s.getStorefilesCount());
} }
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long newFlushableSize = store.getFlushableSize().getHeapSize(); MemStoreSize mss = store.getFlushableSize();
assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD); assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
// assert that the region memstore is empty // assert that the region memstore is empty
long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();

View File

@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
*/ */
@Category({VerySlowRegionServerTests.class, LargeTests.class}) @Category({VerySlowRegionServerTests.class, LargeTests.class})
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public class TestHRegionWithInMemoryFlush extends TestHRegion{ public class TestHRegionWithInMemoryFlush extends TestHRegion {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =

View File

@ -240,8 +240,7 @@ public class TestHStore {
*/ */
@Test @Test
public void testFlushSizeSizing() throws Exception { public void testFlushSizeSizing() throws Exception {
LOG.info("Setting up a faulty file system that cannot write in " + LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
this.name.getMethodName());
final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
// Only retry once. // Only retry once.
conf.setInt("hbase.hstore.flush.retries.number", 1); conf.setInt("hbase.hstore.flush.retries.number", 1);
@ -260,15 +259,15 @@ public class TestHStore {
// Initialize region // Initialize region
init(name.getMethodName(), conf); init(name.getMethodName(), conf);
MemStoreSize size = store.memstore.getFlushableSize(); MemStoreSize mss = store.memstore.getFlushableSize();
assertEquals(0, size.getDataSize()); assertEquals(0, mss.getDataSize());
LOG.info("Adding some data"); LOG.info("Adding some data");
MemStoreSizing kvSize = new MemStoreSizing(); MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
// add the heap size of active (mutable) segment // add the heap size of active (mutable) segment
kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0); kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0);
size = store.memstore.getFlushableSize(); mss = store.memstore.getFlushableSize();
assertEquals(kvSize, size); assertEquals(kvSize.getMemStoreSize(), mss);
// Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
try { try {
LOG.info("Flushing"); LOG.info("Flushing");
@ -280,23 +279,23 @@ public class TestHStore {
// due to snapshot, change mutable to immutable segment // due to snapshot, change mutable to immutable segment
kvSize.incMemStoreSize(0, kvSize.incMemStoreSize(0,
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD, 0); CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD, 0);
size = store.memstore.getFlushableSize(); mss = store.memstore.getFlushableSize();
assertEquals(kvSize, size); assertEquals(kvSize.getMemStoreSize(), mss);
MemStoreSizing kvSize2 = new MemStoreSizing(); MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2); store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0); kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0);
// Even though we add a new kv, we expect the flushable size to be 'same' since we have // Even though we add a new kv, we expect the flushable size to be 'same' since we have
// not yet cleared the snapshot -- the above flush failed. // not yet cleared the snapshot -- the above flush failed.
assertEquals(kvSize, size); assertEquals(kvSize.getMemStoreSize(), mss);
ffs.fault.set(false); ffs.fault.set(false);
flushStore(store, id++); flushStore(store, id++);
size = store.memstore.getFlushableSize(); mss = store.memstore.getFlushableSize();
// Size should be the foreground kv size. // Size should be the foreground kv size.
assertEquals(kvSize2, size); assertEquals(kvSize2.getMemStoreSize(), mss);
flushStore(store, id++); flushStore(store, id++);
size = store.memstore.getFlushableSize(); mss = store.memstore.getFlushableSize();
assertEquals(0, size.getDataSize()); assertEquals(0, mss.getDataSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, size.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
return null; return null;
} }
}); });
@ -1226,7 +1225,7 @@ public class TestHStore {
byte[] value0 = Bytes.toBytes("value0"); byte[] value0 = Bytes.toBytes("value0");
byte[] value1 = Bytes.toBytes("value1"); byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2"); byte[] value2 = Bytes.toBytes("value2");
MemStoreSizing memStoreSizing = new MemStoreSizing(); MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime(); long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100; long seqId = 100;
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
@ -1285,7 +1284,7 @@ public class TestHStore {
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("value"); byte[] value = Bytes.toBytes("value");
MemStoreSizing memStoreSizing = new MemStoreSizing(); MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime(); long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100; long seqId = 100;
// older data whihc shouldn't be "seen" by client // older data whihc shouldn't be "seen" by client
@ -1363,7 +1362,7 @@ public class TestHStore {
}); });
byte[] oldValue = Bytes.toBytes("oldValue"); byte[] oldValue = Bytes.toBytes("oldValue");
byte[] currentValue = Bytes.toBytes("currentValue"); byte[] currentValue = Bytes.toBytes("currentValue");
MemStoreSizing memStoreSizing = new MemStoreSizing(); MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime(); long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100; long seqId = 100;
// older data whihc shouldn't be "seen" by client // older data whihc shouldn't be "seen" by client
@ -1479,7 +1478,7 @@ public class TestHStore {
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("thisisavarylargevalue"); byte[] value = Bytes.toBytes("thisisavarylargevalue");
MemStoreSizing memStoreSizing = new MemStoreSizing(); MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime(); long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100; long seqId = 100;
// older data whihc shouldn't be "seen" by client // older data whihc shouldn't be "seen" by client
@ -1601,7 +1600,7 @@ public class TestHStore {
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
// Set the lower threshold to invoke the "MERGE" policy // Set the lower threshold to invoke the "MERGE" policy
MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
MemStoreSizing memStoreSizing = new MemStoreSizing(); MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
long seqID = 1L; long seqID = 1L;
// Add some data to the region and do some flushes // Add some data to the region and do some flushes