HBASE-20411 Ameliorate MutableSegment synchronize
Change the MemStore size accounting so we don't synchronize across three volatiles applying deltas. Instead: + Make MemStoreSize, a datastructure of our memstore size longs, immutable. + Undo MemStoreSizing being an instance of MemStoreSize; instead it has-a. + Make two MemStoreSizing implementations; one thread-safe, the other not. + Let all memory sizing longs run independent, untied by synchronize (Huaxiang and Anoop suggestion) using atomiclongs. + Review all use of MemStoreSizing. Many are single-threaded and do not need to be synchronized; use the non-thread safe counter. TODO: Use this technique accounting at the global level too.
This commit is contained in:
parent
1f10ef553e
commit
5ac7740896
|
@ -165,13 +165,7 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
|
||||
@Override
|
||||
public MemStoreSize getSnapshotSize() {
|
||||
return getSnapshotSizing();
|
||||
}
|
||||
|
||||
MemStoreSizing getSnapshotSizing() {
|
||||
return new MemStoreSizing(this.snapshot.keySize(),
|
||||
this.snapshot.heapSize(),
|
||||
this.snapshot.offHeapSize());
|
||||
return this.snapshot.getMemStoreSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,7 +40,7 @@ public class CSLMImmutableSegment extends ImmutableSegment {
|
|||
super(segment);
|
||||
// update the segment metadata heap size
|
||||
long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM;
|
||||
incSize(0, indexOverhead, 0); // CSLM is always on-heap
|
||||
incMemStoreSize(0, indexOverhead, 0); // CSLM is always on-heap
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -45,7 +45,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
|
|||
protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
|
||||
MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
|
||||
super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
|
||||
incSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap
|
||||
incMemStoreSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap
|
||||
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
|
||||
initializeCellSet(numOfCells, iterator, action);
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
|
|||
MemStoreCompactionStrategy.Action action) {
|
||||
super(segment); // initiailize the upper class
|
||||
long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
|
||||
incSize(0, indexOverhead, 0); // CAM is always on-heap
|
||||
incMemStoreSize(0, indexOverhead, 0); // CAM is always on-heap
|
||||
int numOfCells = segment.getCellsCount();
|
||||
// build the new CellSet based on CellChunkMap and update the CellSet of this Segment
|
||||
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
|
||||
|
@ -67,7 +67,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
|
|||
// arrange the meta-data size, decrease all meta-data sizes related to SkipList;
|
||||
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
|
||||
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
|
||||
incSize(0, newSegmentSizeDelta, 0);
|
||||
incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
}
|
||||
|
||||
|
|
|
@ -58,9 +58,9 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
boolean onHeap = getMemStoreLAB().isOnHeap();
|
||||
// initiate the heapSize with the size of the segment metadata
|
||||
if(onHeap) {
|
||||
incSize(0, indexOverhead, 0);
|
||||
incMemStoreSize(0, indexOverhead, 0);
|
||||
} else {
|
||||
incSize(0, 0, indexOverhead);
|
||||
incMemStoreSize(0, 0, indexOverhead);
|
||||
}
|
||||
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
|
||||
initializeCellSet(numOfCells, iterator, action);
|
||||
|
@ -79,9 +79,9 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
boolean onHeap = getMemStoreLAB().isOnHeap();
|
||||
// initiate the heapSize with the size of the segment metadata
|
||||
if(onHeap) {
|
||||
incSize(0, indexOverhead, 0);
|
||||
incMemStoreSize(0, indexOverhead, 0);
|
||||
} else {
|
||||
incSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM);
|
||||
incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM);
|
||||
}
|
||||
int numOfCells = segment.getCellsCount();
|
||||
// build the new CellSet based on CellChunkMap
|
||||
|
@ -92,10 +92,10 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
// (reinitializeCellSet doesn't take the care for the sizes)
|
||||
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
|
||||
if(onHeap) {
|
||||
incSize(0, newSegmentSizeDelta, 0);
|
||||
incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
} else {
|
||||
incSize(0, 0, newSegmentSizeDelta);
|
||||
incMemStoreSize(0, 0, newSegmentSizeDelta);
|
||||
memstoreSizing.incMemStoreSize(0, 0, newSegmentSizeDelta);
|
||||
|
||||
}
|
||||
|
@ -333,7 +333,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
long heapOverhead = newHeapSize - oldHeapSize;
|
||||
long offHeapOverhead = newOffHeapSize - oldOffHeapSize;
|
||||
//TODO: maybe need to update the dataSize of the region
|
||||
incSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
|
||||
incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
|
||||
return cell;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,12 +155,12 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
*/
|
||||
@Override
|
||||
public MemStoreSize size() {
|
||||
MemStoreSizing memstoreSizing = new MemStoreSizing();
|
||||
MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
memstoreSizing.incMemStoreSize(active.getMemStoreSize());
|
||||
for (Segment item : pipeline.getSegments()) {
|
||||
memstoreSizing.incMemStoreSize(item.getMemStoreSize());
|
||||
}
|
||||
return memstoreSizing;
|
||||
return memstoreSizing.getMemStoreSize();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -216,42 +216,38 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
return new MemStoreSnapshot(snapshotId, this.snapshot);
|
||||
}
|
||||
|
||||
/**
|
||||
* On flush, how much memory we will clear.
|
||||
* @return size of data that is going to be flushed
|
||||
*/
|
||||
@Override
|
||||
public MemStoreSize getFlushableSize() {
|
||||
MemStoreSizing snapshotSizing = getSnapshotSizing();
|
||||
if (snapshotSizing.getDataSize() == 0) {
|
||||
MemStoreSize mss = getSnapshotSize();
|
||||
if (mss.getDataSize() == 0) {
|
||||
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
|
||||
if (compositeSnapshot) {
|
||||
snapshotSizing = pipeline.getPipelineSizing();
|
||||
snapshotSizing.incMemStoreSize(active.getMemStoreSize());
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
|
||||
memStoreSizing.incMemStoreSize(this.active.getMemStoreSize());
|
||||
mss = memStoreSizing.getMemStoreSize();
|
||||
} else {
|
||||
snapshotSizing = pipeline.getTailSizing();
|
||||
mss = pipeline.getTailSize();
|
||||
}
|
||||
}
|
||||
return snapshotSizing.getDataSize() > 0 ? snapshotSizing
|
||||
: new MemStoreSize(active.getMemStoreSize());
|
||||
return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long keySize() {
|
||||
// Need to consider keySize of all segments in pipeline and active
|
||||
long k = this.active.keySize();
|
||||
// Need to consider dataSize/keySize of all segments in pipeline and active
|
||||
long keySize = this.active.getDataSize();
|
||||
for (Segment segment : this.pipeline.getSegments()) {
|
||||
k += segment.keySize();
|
||||
keySize += segment.getDataSize();
|
||||
}
|
||||
return k;
|
||||
return keySize;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long heapSize() {
|
||||
// Need to consider heapOverhead of all segments in pipeline and active
|
||||
long h = this.active.heapSize();
|
||||
long h = this.active.getHeapSize();
|
||||
for (Segment segment : this.pipeline.getSegments()) {
|
||||
h += segment.heapSize();
|
||||
h += segment.getHeapSize();
|
||||
}
|
||||
return h;
|
||||
}
|
||||
|
@ -447,7 +443,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
|
||||
@VisibleForTesting
|
||||
protected boolean shouldFlushInMemory() {
|
||||
if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold
|
||||
if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold
|
||||
if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush
|
||||
return false; // regardless the size
|
||||
}
|
||||
|
@ -571,7 +567,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
|
||||
// debug method
|
||||
public void debug() {
|
||||
String msg = "active size=" + this.active.keySize();
|
||||
String msg = "active size=" + this.active.getDataSize();
|
||||
msg += " in-memory flush size is "+ inmemoryFlushSize;
|
||||
msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
|
||||
msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");
|
||||
|
|
|
@ -23,11 +23,12 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
||||
/**
|
||||
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
|
||||
|
@ -135,19 +136,21 @@ public class CompactionPipeline {
|
|||
// update the global memstore size counter
|
||||
long suffixDataSize = getSegmentsKeySize(suffix);
|
||||
long newDataSize = 0;
|
||||
if(segment != null) newDataSize = segment.keySize();
|
||||
if(segment != null) {
|
||||
newDataSize = segment.getDataSize();
|
||||
}
|
||||
long dataSizeDelta = suffixDataSize - newDataSize;
|
||||
long suffixHeapSize = getSegmentsHeapSize(suffix);
|
||||
long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
|
||||
long newHeapSize = 0;
|
||||
long newOffHeapSize = 0;
|
||||
if(segment != null) {
|
||||
newHeapSize = segment.heapSize();
|
||||
newOffHeapSize = segment.offHeapSize();
|
||||
newHeapSize = segment.getHeapSize();
|
||||
newOffHeapSize = segment.getOffHeapSize();
|
||||
}
|
||||
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
|
||||
long heapSizeDelta = suffixHeapSize - newHeapSize;
|
||||
region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta));
|
||||
region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
|
||||
LOG.debug("Suffix data size={}, new segment data size={}, "
|
||||
+ "suffix heap size={}," + "new segment heap size={}"
|
||||
+ "suffix off heap size={}," + "new segment off heap size={}"
|
||||
|
@ -164,7 +167,7 @@ public class CompactionPipeline {
|
|||
private static long getSegmentsHeapSize(List<? extends Segment> list) {
|
||||
long res = 0;
|
||||
for (Segment segment : list) {
|
||||
res += segment.heapSize();
|
||||
res += segment.getHeapSize();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -172,7 +175,7 @@ public class CompactionPipeline {
|
|||
private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
|
||||
long res = 0;
|
||||
for (Segment segment : list) {
|
||||
res += segment.offHeapSize();
|
||||
res += segment.getOffHeapSize();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -180,7 +183,7 @@ public class CompactionPipeline {
|
|||
private static long getSegmentsKeySize(List<? extends Segment> list) {
|
||||
long res = 0;
|
||||
for (Segment segment : list) {
|
||||
res += segment.keySize();
|
||||
res += segment.getDataSize();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -211,15 +214,17 @@ public class CompactionPipeline {
|
|||
int i = 0;
|
||||
for (ImmutableSegment s : pipeline) {
|
||||
if ( s.canBeFlattened() ) {
|
||||
MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated
|
||||
// size to be updated
|
||||
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
|
||||
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
|
||||
(CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
|
||||
replaceAtIndex(i,newS);
|
||||
if(region != null) {
|
||||
// update the global memstore size counter
|
||||
// upon flattening there is no change in the data size
|
||||
region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize(),
|
||||
newMemstoreAccounting.getOffHeapSize()));
|
||||
// Update the global memstore size counter upon flattening there is no change in the
|
||||
// data size
|
||||
MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
|
||||
Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!");
|
||||
region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
}
|
||||
LOG.debug("Compaction pipeline segment {} flattened", s);
|
||||
return true;
|
||||
|
@ -254,19 +259,18 @@ public class CompactionPipeline {
|
|||
return minSequenceId;
|
||||
}
|
||||
|
||||
public MemStoreSizing getTailSizing() {
|
||||
public MemStoreSize getTailSize() {
|
||||
LinkedList<? extends Segment> localCopy = readOnlyCopy;
|
||||
if (localCopy.isEmpty()) return new MemStoreSizing();
|
||||
return new MemStoreSizing(localCopy.peekLast().getMemStoreSize());
|
||||
return localCopy.isEmpty()? new MemStoreSize(): localCopy.peekLast().getMemStoreSize();
|
||||
}
|
||||
|
||||
public MemStoreSizing getPipelineSizing() {
|
||||
MemStoreSizing memStoreSizing = new MemStoreSizing();
|
||||
public MemStoreSize getPipelineSize() {
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
LinkedList<? extends Segment> localCopy = readOnlyCopy;
|
||||
for (Segment segment : localCopy) {
|
||||
memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
|
||||
}
|
||||
return memStoreSizing;
|
||||
return memStoreSizing.getMemStoreSize();
|
||||
}
|
||||
|
||||
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
|
||||
|
|
|
@ -48,7 +48,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
|
|||
for (ImmutableSegment s : segments) {
|
||||
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
|
||||
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
|
||||
this.keySize += s.keySize();
|
||||
this.keySize += s.getDataSize();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,7 +170,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
|
|||
* @return Sum of all cell sizes.
|
||||
*/
|
||||
@Override
|
||||
public long keySize() {
|
||||
public long getDataSize() {
|
||||
return this.keySize;
|
||||
}
|
||||
|
||||
|
@ -178,10 +178,10 @@ public class CompositeImmutableSegment extends ImmutableSegment {
|
|||
* @return The heap size of this segment.
|
||||
*/
|
||||
@Override
|
||||
public long heapSize() {
|
||||
public long getHeapSize() {
|
||||
long result = 0;
|
||||
for (ImmutableSegment s : segments) {
|
||||
result += s.heapSize();
|
||||
result += s.getHeapSize();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -190,7 +190,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
|
|||
* Updates the heap size counter of the segment by the given delta
|
||||
*/
|
||||
@Override
|
||||
protected void incSize(long delta, long heapOverhead, long offHeapOverhead) {
|
||||
public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
|
|
|
@ -97,26 +97,20 @@ public class DefaultMemStore extends AbstractMemStore {
|
|||
return new MemStoreSnapshot(this.snapshotId, this.snapshot);
|
||||
}
|
||||
|
||||
/**
|
||||
* On flush, how much memory we will clear from the active cell set.
|
||||
*
|
||||
* @return size of data that is going to be flushed from active set
|
||||
*/
|
||||
@Override
|
||||
public MemStoreSize getFlushableSize() {
|
||||
MemStoreSize snapshotSize = getSnapshotSize();
|
||||
return snapshotSize.getDataSize() > 0 ? snapshotSize
|
||||
: new MemStoreSize(active.getMemStoreSize());
|
||||
MemStoreSize mss = getSnapshotSize();
|
||||
return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long keySize() {
|
||||
return this.active.keySize();
|
||||
return this.active.getDataSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long heapSize() {
|
||||
return this.active.heapSize();
|
||||
return this.active.getHeapSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -154,7 +148,7 @@ public class DefaultMemStore extends AbstractMemStore {
|
|||
|
||||
@Override
|
||||
public MemStoreSize size() {
|
||||
return new MemStoreSize(active.getMemStoreSize());
|
||||
return active.getMemStoreSize();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -193,26 +187,27 @@ public class DefaultMemStore extends AbstractMemStore {
|
|||
byte [] fam = Bytes.toBytes("col");
|
||||
byte [] qf = Bytes.toBytes("umn");
|
||||
byte [] empty = new byte[0];
|
||||
MemStoreSizing memstoreSizing = new MemStoreSizing();
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
for (int i = 0; i < count; i++) {
|
||||
// Give each its own ts
|
||||
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
|
||||
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
|
||||
}
|
||||
LOG.info("memstore1 estimated size="
|
||||
+ (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
|
||||
LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
|
||||
memStoreSizing.getMemStoreSize().getHeapSize());
|
||||
for (int i = 0; i < count; i++) {
|
||||
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
|
||||
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing);
|
||||
}
|
||||
LOG.info("memstore1 estimated size (2nd loading of same data)="
|
||||
+ (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
|
||||
LOG.info("memstore1 estimated size (2nd loading of same data)={}",
|
||||
memStoreSizing.getMemStoreSize().getDataSize() +
|
||||
memStoreSizing.getMemStoreSize().getHeapSize());
|
||||
// Make a variably sized memstore.
|
||||
DefaultMemStore memstore2 = new DefaultMemStore();
|
||||
memstoreSizing = new MemStoreSizing();
|
||||
memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
for (int i = 0; i < count; i++) {
|
||||
memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSizing);
|
||||
memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing);
|
||||
}
|
||||
LOG.info("memstore2 estimated size="
|
||||
+ (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
|
||||
LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() +
|
||||
memStoreSizing.getMemStoreSize().getHeapSize());
|
||||
final int seconds = 30;
|
||||
LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
|
||||
LOG.info("Exiting.");
|
||||
|
|
|
@ -291,7 +291,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
|
||||
|
||||
// Track data size in all memstores
|
||||
private final MemStoreSizing memStoreSize = new MemStoreSizing();
|
||||
private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
|
||||
private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this);
|
||||
|
||||
// Debug possible data loss due to WAL off
|
||||
|
@ -1209,36 +1209,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* Increase the size of mem store in this region and the size of global mem
|
||||
* store
|
||||
*/
|
||||
public void incMemStoreSize(MemStoreSize memStoreSize) {
|
||||
if (this.rsAccounting != null) {
|
||||
rsAccounting.incGlobalMemStoreSize(memStoreSize);
|
||||
}
|
||||
long dataSize;
|
||||
synchronized (this.memStoreSize) {
|
||||
this.memStoreSize.incMemStoreSize(memStoreSize);
|
||||
dataSize = this.memStoreSize.getDataSize();
|
||||
}
|
||||
checkNegativeMemStoreDataSize(dataSize, memStoreSize.getDataSize());
|
||||
void incMemStoreSize(MemStoreSize mss) {
|
||||
incMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
}
|
||||
|
||||
public void decrMemStoreSize(MemStoreSize memStoreSize) {
|
||||
void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
if (this.rsAccounting != null) {
|
||||
rsAccounting.decGlobalMemStoreSize(memStoreSize);
|
||||
rsAccounting.incGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
}
|
||||
long size;
|
||||
synchronized (this.memStoreSize) {
|
||||
this.memStoreSize.decMemStoreSize(memStoreSize);
|
||||
size = this.memStoreSize.getDataSize();
|
||||
long dataSize =
|
||||
this.memStoreSizing.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
checkNegativeMemStoreDataSize(dataSize, dataSizeDelta);
|
||||
}
|
||||
|
||||
void decrMemStoreSize(MemStoreSize mss) {
|
||||
decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
}
|
||||
|
||||
void decrMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
if (this.rsAccounting != null) {
|
||||
rsAccounting.decGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
}
|
||||
checkNegativeMemStoreDataSize(size, -memStoreSize.getDataSize());
|
||||
long dataSize =
|
||||
this.memStoreSizing.decMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
checkNegativeMemStoreDataSize(dataSize, -dataSizeDelta);
|
||||
}
|
||||
|
||||
private void checkNegativeMemStoreDataSize(long memStoreDataSize, long delta) {
|
||||
// This is extremely bad if we make memStoreSize negative. Log as much info on the offending
|
||||
// caller as possible. (memStoreSize might be a negative value already -- freeing memory)
|
||||
// This is extremely bad if we make memStoreSizing negative. Log as much info on the offending
|
||||
// caller as possible. (memStoreSizing might be a negative value already -- freeing memory)
|
||||
if (memStoreDataSize < 0) {
|
||||
LOG.error("Asked to modify this region's (" + this.toString()
|
||||
+ ") memStoreSize to a negative value which is incorrect. Current memStoreSize="
|
||||
+ ") memStoreSizing to a negative value which is incorrect. Current memStoreSizing="
|
||||
+ (memStoreDataSize - delta) + ", delta=" + delta, new Exception());
|
||||
}
|
||||
}
|
||||
|
@ -1273,17 +1275,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
@Override
|
||||
public long getMemStoreDataSize() {
|
||||
return memStoreSize.getDataSize();
|
||||
return memStoreSizing.getDataSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMemStoreHeapSize() {
|
||||
return memStoreSize.getHeapSize();
|
||||
return memStoreSizing.getHeapSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMemStoreOffHeapSize() {
|
||||
return memStoreSize.getOffHeapSize();
|
||||
return memStoreSizing.getOffHeapSize();
|
||||
}
|
||||
|
||||
/** @return store services for this region, to access services required by store level needs */
|
||||
|
@ -1554,7 +1556,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
int failedfFlushCount = 0;
|
||||
int flushCount = 0;
|
||||
long tmp = 0;
|
||||
long remainingSize = this.memStoreSize.getDataSize();
|
||||
long remainingSize = this.memStoreSizing.getDataSize();
|
||||
while (remainingSize > 0) {
|
||||
try {
|
||||
internalFlushcache(status);
|
||||
|
@ -1563,7 +1565,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
" (carrying snapshot?) " + this);
|
||||
}
|
||||
flushCount++;
|
||||
tmp = this.memStoreSize.getDataSize();
|
||||
tmp = this.memStoreSizing.getDataSize();
|
||||
if (tmp >= remainingSize) {
|
||||
failedfFlushCount++;
|
||||
}
|
||||
|
@ -1597,13 +1599,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
// close each store in parallel
|
||||
for (HStore store : stores.values()) {
|
||||
MemStoreSize flushableSize = store.getFlushableSize();
|
||||
if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) {
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
if (!(abort || mss.getDataSize() == 0 || writestate.readOnly)) {
|
||||
if (getRegionServerServices() != null) {
|
||||
getRegionServerServices().abort("Assertion failed while closing store "
|
||||
+ getRegionInfo().getRegionNameAsString() + " " + store
|
||||
+ ". flushableSize expected=0, actual= " + flushableSize
|
||||
+ ". Current memStoreSize=" + getMemStoreDataSize() + ". Maybe a coprocessor "
|
||||
+ ". flushableSize expected=0, actual={" + mss
|
||||
+ "}. Current memStoreSize=" + this.memStoreSizing.getMemStoreSize() +
|
||||
". Maybe a coprocessor "
|
||||
+ "operation failed and left the memstore in a partially updated state.", null);
|
||||
}
|
||||
}
|
||||
|
@ -1646,9 +1649,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
this.closed.set(true);
|
||||
if (!canFlush) {
|
||||
this.decrMemStoreSize(new MemStoreSize(memStoreSize));
|
||||
} else if (memStoreSize.getDataSize() != 0) {
|
||||
LOG.error("Memstore data size is " + memStoreSize.getDataSize());
|
||||
decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
|
||||
} else if (this.memStoreSizing.getDataSize() != 0) {
|
||||
LOG.error("Memstore data size is {}", this.memStoreSizing.getDataSize());
|
||||
}
|
||||
if (coprocessorHost != null) {
|
||||
status.setStatus("Running coprocessor post-close hooks");
|
||||
|
@ -1781,7 +1784,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* @return True if its worth doing a flush before we put up the close flag.
|
||||
*/
|
||||
private boolean worthPreFlushing() {
|
||||
return this.memStoreSize.getDataSize() >
|
||||
return this.memStoreSizing.getDataSize() >
|
||||
this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
|
||||
}
|
||||
|
||||
|
@ -2399,12 +2402,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs
|
||||
// to no other that it can use to associate with the bulk load. Hence this little dance below
|
||||
// to go get one.
|
||||
if (this.memStoreSize.getDataSize() <= 0) {
|
||||
if (this.memStoreSizing.getDataSize() <= 0) {
|
||||
// Take an update lock so no edits can come into memory just yet.
|
||||
this.updatesLock.writeLock().lock();
|
||||
WriteEntry writeEntry = null;
|
||||
try {
|
||||
if (this.memStoreSize.getDataSize() <= 0) {
|
||||
if (this.memStoreSizing.getDataSize() <= 0) {
|
||||
// Presume that if there are still no edits in the memstore, then there are no edits for
|
||||
// this region out in the WAL subsystem so no need to do any trickery clearing out
|
||||
// edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
|
||||
|
@ -2446,7 +2449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// block waiting for the lock for internal flush
|
||||
this.updatesLock.writeLock().lock();
|
||||
status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
|
||||
MemStoreSizing totalSizeOfFlushableStores = new MemStoreSizing();
|
||||
MemStoreSizing totalSizeOfFlushableStores = new NonThreadSafeMemStoreSizing();
|
||||
|
||||
Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
|
||||
for (HStore store : storesToFlush) {
|
||||
|
@ -2535,14 +2538,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
if (!isAllFamilies(storesToFlush)) {
|
||||
perCfExtras = new StringBuilder();
|
||||
for (HStore store: storesToFlush) {
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
perCfExtras.append("; ").append(store.getColumnFamilyName());
|
||||
perCfExtras.append("=")
|
||||
.append(StringUtils.byteDesc(store.getFlushableSize().getDataSize()));
|
||||
perCfExtras.append("={dataSize=")
|
||||
.append(StringUtils.byteDesc(mss.getDataSize()));
|
||||
perCfExtras.append(", heapSize=")
|
||||
.append(StringUtils.byteDesc(mss.getHeapSize()));
|
||||
perCfExtras.append(", offHeapSize=")
|
||||
.append(StringUtils.byteDesc(mss.getOffHeapSize()));
|
||||
perCfExtras.append("}");
|
||||
}
|
||||
}
|
||||
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
|
||||
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," +
|
||||
" memstore data size=" + StringUtils.byteDesc(this.memStoreSize.getDataSize()) +
|
||||
" memstore heap size=" + StringUtils.byteDesc(this.memStoreSize.getHeapSize()) +
|
||||
" memstore data size=" + StringUtils.byteDesc(mss.getDataSize()) +
|
||||
" memstore heap size=" + StringUtils.byteDesc(mss.getHeapSize()) +
|
||||
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
|
||||
((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
|
||||
}
|
||||
|
@ -2662,7 +2672,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
storeFlushCtxs.clear();
|
||||
|
||||
// Set down the memstore size by amount of flush.
|
||||
this.decrMemStoreSize(prepareResult.totalFlushableSize);
|
||||
MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
|
||||
this.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
|
||||
if (wal != null) {
|
||||
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
|
||||
|
@ -2729,12 +2740,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
long time = EnvironmentEdgeManager.currentTime() - startTime;
|
||||
long flushableDataSize = prepareResult.totalFlushableSize.getDataSize();
|
||||
long flushableHeapSize = prepareResult.totalFlushableSize.getHeapSize();
|
||||
long memstoresize = this.memStoreSize.getDataSize();
|
||||
MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
|
||||
long memstoresize = this.memStoreSizing.getMemStoreSize().getDataSize();
|
||||
String msg = "Finished memstore flush;"
|
||||
+ " data size ~" + StringUtils.byteDesc(flushableDataSize) + "/" + flushableDataSize
|
||||
+ ", heap size ~" + StringUtils.byteDesc(flushableHeapSize) + "/" + flushableHeapSize
|
||||
+ " data size ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize()
|
||||
+ ", heap size ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize()
|
||||
+ ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
|
||||
+ " for " + this.getRegionInfo().getEncodedName() + " in " + time + "ms, sequenceid="
|
||||
+ flushOpSeqId + ", compaction requested=" + compactionRequested
|
||||
|
@ -2744,7 +2754,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
if (rsServices != null && rsServices.getMetrics() != null) {
|
||||
rsServices.getMetrics().updateFlush(time - startTime,
|
||||
prepareResult.totalFlushableSize.getDataSize(), flushedOutputFileSize);
|
||||
mss.getDataSize(), flushedOutputFileSize);
|
||||
}
|
||||
|
||||
return new FlushResultImpl(compactionRequested ?
|
||||
|
@ -3056,7 +3066,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
protected void writeMiniBatchOperationsToMemStore(
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)
|
||||
throws IOException {
|
||||
MemStoreSizing memStoreAccounting = new MemStoreSizing();
|
||||
MemStoreSizing memStoreAccounting = new NonThreadSafeMemStoreSizing();
|
||||
visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
|
||||
// We need to update the sequence id for following reasons.
|
||||
// 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
|
||||
|
@ -3069,7 +3079,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return true;
|
||||
});
|
||||
// update memStore size
|
||||
region.incMemStoreSize(memStoreAccounting);
|
||||
region.incMemStoreSize(memStoreAccounting.getDataSize(), memStoreAccounting.getHeapSize(),
|
||||
memStoreAccounting.getOffHeapSize());
|
||||
}
|
||||
|
||||
public boolean isDone() {
|
||||
|
@ -4263,8 +4274,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// If catalog region, do not impose resource constraints or block updates.
|
||||
if (this.getRegionInfo().isMetaRegion()) return;
|
||||
|
||||
if (this.memStoreSize.getHeapSize()
|
||||
+ this.memStoreSize.getOffHeapSize() > this.blockingMemStoreSize) {
|
||||
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
|
||||
if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) {
|
||||
blockedRequestsCount.increment();
|
||||
requestFlush();
|
||||
// Don't print current limit because it will vary too much. The message is used as a key
|
||||
|
@ -4634,7 +4645,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
boolean flush = false;
|
||||
MemStoreSizing memstoreSize = new MemStoreSizing();
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
for (Cell cell: val.getCells()) {
|
||||
// Check this edit is for me. Also, guard against writing the special
|
||||
// METACOLUMN info such as HBASE::CACHEFLUSH entries
|
||||
|
@ -4677,15 +4688,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
PrivateCellUtil.setSequenceId(cell, currentReplaySeqId);
|
||||
|
||||
restoreEdit(store, cell, memstoreSize);
|
||||
restoreEdit(store, cell, memStoreSizing);
|
||||
editsCount++;
|
||||
}
|
||||
MemStoreSize mss = memStoreSizing.getMemStoreSize();
|
||||
if (this.rsAccounting != null) {
|
||||
rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(),
|
||||
memstoreSize);
|
||||
rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss);
|
||||
}
|
||||
incMemStoreSize(memstoreSize);
|
||||
flush = isFlushSize(this.memStoreSize);
|
||||
incMemStoreSize(mss);
|
||||
flush = isFlushSize(this.memStoreSizing.getMemStoreSize());
|
||||
if (flush) {
|
||||
internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
|
||||
FlushLifeCycleTracker.DUMMY);
|
||||
|
@ -4995,8 +5006,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
replayFlushInStores(flush, prepareFlushResult, true);
|
||||
|
||||
// Set down the memstore size by amount of flush.
|
||||
this.decrMemStoreSize(prepareFlushResult.totalFlushableSize);
|
||||
|
||||
this.decrMemStoreSize(prepareFlushResult.totalFlushableSize.getMemStoreSize());
|
||||
this.prepareFlushResult = null;
|
||||
writestate.flushing = false;
|
||||
} else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
|
||||
|
@ -5028,7 +5038,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
replayFlushInStores(flush, prepareFlushResult, true);
|
||||
|
||||
// Set down the memstore size by amount of flush.
|
||||
this.decrMemStoreSize(prepareFlushResult.totalFlushableSize);
|
||||
this.decrMemStoreSize(prepareFlushResult.totalFlushableSize.getMemStoreSize());
|
||||
|
||||
// Inspect the memstore contents to see whether the memstore contains only edits
|
||||
// with seqId smaller than the flush seqId. If so, we can discard those edits.
|
||||
|
@ -5132,7 +5142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* @throws IOException
|
||||
*/
|
||||
private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException {
|
||||
MemStoreSizing totalFreedSize = new MemStoreSizing();
|
||||
MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
|
||||
this.updatesLock.writeLock().lock();
|
||||
try {
|
||||
|
||||
|
@ -5159,7 +5169,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
} finally {
|
||||
this.updatesLock.writeLock().unlock();
|
||||
}
|
||||
return totalFreedSize;
|
||||
return totalFreedSize.getMemStoreSize();
|
||||
}
|
||||
|
||||
private MemStoreSize doDropStoreMemStoreContentsForSeqId(HStore s, long currentSeqId)
|
||||
|
@ -5282,9 +5292,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
|
||||
null : this.prepareFlushResult.storeFlushCtxs.get(family);
|
||||
if (ctx != null) {
|
||||
MemStoreSize snapshotSize = store.getFlushableSize();
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
ctx.abort();
|
||||
this.decrMemStoreSize(snapshotSize);
|
||||
this.decrMemStoreSize(mss);
|
||||
this.prepareFlushResult.storeFlushCtxs.remove(family);
|
||||
}
|
||||
}
|
||||
|
@ -5476,12 +5486,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
null : this.prepareFlushResult.storeFlushCtxs.get(
|
||||
store.getColumnFamilyDescriptor().getName());
|
||||
if (ctx != null) {
|
||||
MemStoreSize snapshotSize = store.getFlushableSize();
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
ctx.abort();
|
||||
this.decrMemStoreSize(snapshotSize);
|
||||
this.prepareFlushResult.storeFlushCtxs.remove(
|
||||
store.getColumnFamilyDescriptor().getName());
|
||||
totalFreedDataSize += snapshotSize.getDataSize();
|
||||
this.decrMemStoreSize(mss);
|
||||
this.prepareFlushResult.storeFlushCtxs.
|
||||
remove(store.getColumnFamilyDescriptor().getName());
|
||||
totalFreedDataSize += mss.getDataSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7363,8 +7373,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return null;
|
||||
}
|
||||
ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
|
||||
stats.setMemStoreLoad((int) (Math.min(100, (this.memStoreSize.getHeapSize() * 100) / this
|
||||
.memstoreFlushSize)));
|
||||
stats.setMemStoreLoad((int) (Math.min(100,
|
||||
(this.memStoreSizing.getMemStoreSize().getHeapSize() * 100) / this.memstoreFlushSize)));
|
||||
if (rsServices.getHeapMemoryManager() != null) {
|
||||
// the HeapMemoryManager uses -0.0 to signal a problem asking the JVM,
|
||||
// so we could just do the calculation below and we'll get a 0.
|
||||
|
@ -7425,7 +7435,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// This is assigned by mvcc either explicity in the below or in the guts of the WAL append
|
||||
// when it assigns the edit a sequencedid (A.K.A the mvcc write number).
|
||||
WriteEntry writeEntry = null;
|
||||
MemStoreSizing memstoreAccounting = new MemStoreSizing();
|
||||
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
|
||||
try {
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -7511,7 +7521,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
} finally {
|
||||
closeRegionOperation();
|
||||
if (!mutations.isEmpty()) {
|
||||
this.incMemStoreSize(memstoreAccounting);
|
||||
this.incMemStoreSize(memstoreAccounting.getMemStoreSize());
|
||||
requestFlushIfNeeded();
|
||||
}
|
||||
}
|
||||
|
@ -7615,7 +7625,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
startRegionOperation(op);
|
||||
List<Cell> results = returnResults? new ArrayList<>(mutation.size()): null;
|
||||
RowLock rowLock = null;
|
||||
MemStoreSizing memstoreAccounting = new MemStoreSizing();
|
||||
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
|
||||
try {
|
||||
rowLock = getRowLockInternal(mutation.getRow(), false, null);
|
||||
lock(this.updatesLock.readLock());
|
||||
|
@ -7665,7 +7675,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
rowLock.release();
|
||||
}
|
||||
// Request a cache flush if over the limit. Do it outside update lock.
|
||||
incMemStoreSize(memstoreAccounting);
|
||||
incMemStoreSize(memstoreAccounting.getMemStoreSize());
|
||||
requestFlushIfNeeded();
|
||||
closeRegionOperation(op);
|
||||
if (this.metricsRegion != null) {
|
||||
|
@ -8546,7 +8556,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
private void requestFlushIfNeeded() throws RegionTooBusyException {
|
||||
if(isFlushSize(memStoreSize)) {
|
||||
if(isFlushSize(this.memStoreSizing.getMemStoreSize())) {
|
||||
requestFlush();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2230,7 +2230,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
this.cacheFlushCount = snapshot.getCellsCount();
|
||||
this.cacheFlushSize = snapshot.getDataSize();
|
||||
committedFiles = new ArrayList<>(1);
|
||||
return new MemStoreSize(snapshot.getMemStoreSize());
|
||||
return snapshot.getMemStoreSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -50,12 +50,11 @@ public interface MemStore {
|
|||
void clearSnapshot(long id) throws UnexpectedStateException;
|
||||
|
||||
/**
|
||||
* On flush, how much memory we will clear.
|
||||
* Flush will first clear out the data in snapshot if any (It will take a second flush
|
||||
* invocation to clear the current Cell set). If snapshot is empty, current
|
||||
* Cell set will be flushed.
|
||||
*
|
||||
* @return size of data that is going to be flushed
|
||||
* @return On flush, how much memory we will clear.
|
||||
*/
|
||||
MemStoreSize getFlushableSize();
|
||||
|
||||
|
|
|
@ -550,8 +550,9 @@ class MemStoreFlusher implements FlushRequester {
|
|||
// If this is first time we've been put off, then emit a log message.
|
||||
if (fqe.getRequeueCount() <= 0) {
|
||||
// Note: We don't impose blockingStoreFiles constraint on meta regions
|
||||
LOG.warn("Region " + region.getRegionInfo().getEncodedName() + " has too many " +
|
||||
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
|
||||
LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
|
||||
region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
|
||||
this.blockingWaitTime);
|
||||
if (!this.server.compactSplitThread.requestSplit(region)) {
|
||||
try {
|
||||
this.server.compactSplitThread.requestSystemCompaction(region,
|
||||
|
@ -672,6 +673,14 @@ class MemStoreFlusher implements FlushRequester {
|
|||
return false;
|
||||
}
|
||||
|
||||
private int getStoreFileCount(Region region) {
|
||||
int count = 0;
|
||||
for (Store store : region.getStores()) {
|
||||
count += store.getStorefilesCount();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the regionserver's memstore memory usage is greater than the
|
||||
* limit. If so, flush regions with the biggest memstores until we're down
|
||||
|
@ -755,10 +764,10 @@ class MemStoreFlusher implements FlushRequester {
|
|||
}
|
||||
}
|
||||
|
||||
private void logMsg(String string1, long val, long max) {
|
||||
LOG.info("Blocking updates on " + server.toString() + ": " + string1 + " "
|
||||
+ TraditionalBinaryPrefix.long2String(val, "", 1) + " is >= than blocking "
|
||||
+ TraditionalBinaryPrefix.long2String(max, "", 1) + " size");
|
||||
private void logMsg(String type, long val, long max) {
|
||||
LOG.info("Blocking updates: {} {} is >= blocking {}", type,
|
||||
TraditionalBinaryPrefix.long2String(val, "", 1),
|
||||
TraditionalBinaryPrefix.long2String(max, "", 1));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,64 +19,56 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Reports the data size part and total heap space occupied by the MemStore.
|
||||
* Read-only.
|
||||
* Data structure of three longs.
|
||||
* Convenient package in which to carry current state of three counters.
|
||||
* <p>Immutable!</p>
|
||||
* @see MemStoreSizing
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
||||
public class MemStoreSize {
|
||||
// MemStore size tracks 3 sizes:
|
||||
// (1) data size: the aggregated size of all key-value not including meta data such as
|
||||
// index, time range etc.
|
||||
// (2) heap size: the aggregated size of all data that is allocated on-heap including all
|
||||
// key-values that reside on-heap and the metadata that resides on-heap
|
||||
// (3) off-heap size: the aggregated size of all data that is allocated off-heap including all
|
||||
// key-values that reside off-heap and the metadata that resides off-heap
|
||||
//
|
||||
// 3 examples to illustrate their usage:
|
||||
// Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated
|
||||
// on-heap. The counters are <100MB, 120MB, 0>, respectively.
|
||||
// Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata
|
||||
// allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively.
|
||||
// Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB
|
||||
// are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated
|
||||
// off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active).
|
||||
// The counters are <100MB, 10MB, 110MB>, respectively.
|
||||
|
||||
/**
|
||||
*'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can
|
||||
* be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap
|
||||
* or off heap LABs
|
||||
* be in on heap or off heap area depending on the MSLAB and its configuration to be using on
|
||||
* heap or off heap LABs
|
||||
*/
|
||||
protected volatile long dataSize;
|
||||
private final long dataSize;
|
||||
|
||||
/** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
|
||||
/**'getHeapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
|
||||
* When Cells in on heap area, this will include the cells data size as well.
|
||||
*/
|
||||
protected volatile long heapSize;
|
||||
private final long heapSize;
|
||||
|
||||
/** off-heap size: the aggregated size of all data that is allocated off-heap including all
|
||||
* key-values that reside off-heap and the metadata that resides off-heap
|
||||
*/
|
||||
protected volatile long offHeapSize;
|
||||
private final long offHeapSize;
|
||||
|
||||
public MemStoreSize() {
|
||||
/**
|
||||
* Package private constructor.
|
||||
*/
|
||||
MemStoreSize() {
|
||||
this(0L, 0L, 0L);
|
||||
}
|
||||
|
||||
public MemStoreSize(long dataSize, long heapSize, long offHeapSize) {
|
||||
/**
|
||||
* Package private constructor.
|
||||
*/
|
||||
MemStoreSize(long dataSize, long heapSize, long offHeapSize) {
|
||||
this.dataSize = dataSize;
|
||||
this.heapSize = heapSize;
|
||||
this.offHeapSize = offHeapSize;
|
||||
}
|
||||
|
||||
protected MemStoreSize(MemStoreSize memStoreSize) {
|
||||
this.dataSize = memStoreSize.dataSize;
|
||||
this.heapSize = memStoreSize.heapSize;
|
||||
this.offHeapSize = memStoreSize.offHeapSize;
|
||||
/**
|
||||
* Package private constructor.
|
||||
*/
|
||||
MemStoreSize(MemStoreSize memStoreSize) {
|
||||
this.dataSize = memStoreSize.getDataSize();
|
||||
this.heapSize = memStoreSize.getHeapSize();
|
||||
this.offHeapSize = memStoreSize.getOffHeapSize();
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0;
|
||||
}
|
||||
|
@ -101,24 +93,22 @@ public class MemStoreSize {
|
|||
if (!(obj instanceof MemStoreSize)) {
|
||||
return false;
|
||||
}
|
||||
MemStoreSize other = (MemStoreSize) obj;
|
||||
return this.dataSize == other.dataSize
|
||||
&& this.heapSize == other.heapSize
|
||||
&& this.offHeapSize == other.offHeapSize;
|
||||
MemStoreSize other = (MemStoreSize)obj;
|
||||
return this.dataSize == other.dataSize && this.heapSize == other.heapSize &&
|
||||
this.offHeapSize == other.offHeapSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
long h = 13 * this.dataSize;
|
||||
h = h + 14 * this.heapSize;
|
||||
h = h + 15 * this.offHeapSize;
|
||||
long h = 31 * this.dataSize;
|
||||
h = h + 31 * this.heapSize;
|
||||
h = h + 31 * this.offHeapSize;
|
||||
return (int) h;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "dataSize=" + this.dataSize
|
||||
+ " , heapSize=" + this.heapSize
|
||||
+ " , offHeapSize=" + this.offHeapSize;
|
||||
return "dataSize=" + this.dataSize + ", getHeapSize=" + this.heapSize +
|
||||
", getOffHeapSize=" + this.offHeapSize;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,61 +21,96 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
|
||||
/**
|
||||
* Accounting of current heap and data sizes.
|
||||
* Allows read/write on data/heap size as opposed to {@Link MemStoreSize} which is read-only.
|
||||
* For internal use.
|
||||
* @see MemStoreSize
|
||||
* Tracks 3 sizes:
|
||||
* <ol>
|
||||
* <li></li>data size: the aggregated size of all key-value not including meta data such as
|
||||
* index, time range etc.
|
||||
* </li>
|
||||
* <li>heap size: the aggregated size of all data that is allocated on-heap including all
|
||||
* key-values that reside on-heap and the metadata that resides on-heap
|
||||
* </li>
|
||||
* <li></li>off-heap size: the aggregated size of all data that is allocated off-heap including all
|
||||
* key-values that reside off-heap and the metadata that resides off-heap
|
||||
* </li>
|
||||
* </ol>
|
||||
*
|
||||
* 3 examples to illustrate their usage:
|
||||
* <p>
|
||||
* Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated
|
||||
* on-heap. The counters are <100MB, 120MB, 0>, respectively.
|
||||
* </p>
|
||||
* <p>Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata
|
||||
* allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively.
|
||||
* </p>
|
||||
* <p>
|
||||
* Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB
|
||||
* are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated
|
||||
* off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active).
|
||||
* The counters are <100MB, 10MB, 110MB>, respectively.
|
||||
* </p>
|
||||
*
|
||||
* Like {@link TimeRangeTracker}, it has thread-safe and non-thread-safe implementations.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MemStoreSizing extends MemStoreSize {
|
||||
public static final MemStoreSizing DUD = new MemStoreSizing() {
|
||||
public interface MemStoreSizing {
|
||||
static final MemStoreSizing DUD = new MemStoreSizing() {
|
||||
private final MemStoreSize mss = new MemStoreSize();
|
||||
|
||||
@Override public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
|
||||
long offHeapSizeDelta) {
|
||||
throw new RuntimeException("I'm a dud, you can't use me!");
|
||||
@Override
|
||||
public MemStoreSize getMemStoreSize() {
|
||||
return this.mss;
|
||||
}
|
||||
|
||||
@Override public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta,
|
||||
@Override
|
||||
public long getDataSize() {
|
||||
return this.mss.getDataSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHeapSize() {
|
||||
return this.mss.getHeapSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOffHeapSize() {
|
||||
return this.mss.getOffHeapSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
|
||||
long offHeapSizeDelta) {
|
||||
throw new RuntimeException("I'm a dud, you can't use me!");
|
||||
throw new RuntimeException("I'm a DUD, you can't use me!");
|
||||
}
|
||||
};
|
||||
|
||||
public MemStoreSizing() {
|
||||
super();
|
||||
/**
|
||||
* @return The new dataSize ONLY as a convenience
|
||||
*/
|
||||
long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta);
|
||||
|
||||
default long incMemStoreSize(MemStoreSize delta) {
|
||||
return incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
|
||||
}
|
||||
|
||||
public MemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
|
||||
super(dataSize, heapSize, offHeapSize);
|
||||
/**
|
||||
* @return The new dataSize ONLY as a convenience
|
||||
*/
|
||||
default long decMemStoreSize(long dataSizeDelta, long heapSizeDelta,
|
||||
long offHeapSizeDelta) {
|
||||
return incMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
|
||||
}
|
||||
|
||||
public MemStoreSizing(MemStoreSize memStoreSize) {
|
||||
super(memStoreSize);
|
||||
default long decMemStoreSize(MemStoreSize delta) {
|
||||
return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), -delta.getOffHeapSize());
|
||||
}
|
||||
|
||||
public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
this.dataSize += dataSizeDelta;
|
||||
this.heapSize += heapSizeDelta;
|
||||
this.offHeapSize += offHeapSizeDelta;
|
||||
}
|
||||
|
||||
public void incMemStoreSize(MemStoreSize delta) {
|
||||
incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
|
||||
}
|
||||
|
||||
public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
this.dataSize -= dataSizeDelta;
|
||||
this.heapSize -= heapSizeDelta;
|
||||
this.offHeapSize -= offHeapSizeDelta;
|
||||
}
|
||||
|
||||
public void decMemStoreSize(MemStoreSize delta) {
|
||||
decMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
|
||||
}
|
||||
|
||||
public void empty() {
|
||||
this.dataSize = 0L;
|
||||
this.heapSize = 0L;
|
||||
this.offHeapSize = 0L;
|
||||
}
|
||||
long getDataSize();
|
||||
long getHeapSize();
|
||||
long getOffHeapSize();
|
||||
|
||||
/**
|
||||
* @return Use this datastructure to return all three settings, {@link #getDataSize()},
|
||||
* {@link #getHeapSize()}, and {@link #getOffHeapSize()}, in the one go.
|
||||
*/
|
||||
MemStoreSize getMemStoreSize();
|
||||
}
|
|
@ -44,7 +44,7 @@ public class MutableSegment extends Segment {
|
|||
|
||||
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
|
||||
super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
|
||||
incSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata
|
||||
incMemStoreSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -89,7 +89,7 @@ public class MutableSegment extends Segment {
|
|||
int cellLen = getCellLength(cur);
|
||||
long heapSize = heapSizeChange(cur, true);
|
||||
long offHeapSize = offHeapSizeChange(cur, true);
|
||||
this.incSize(-cellLen, -heapSize, -offHeapSize);
|
||||
incMemStoreSize(-cellLen, -heapSize, -offHeapSize);
|
||||
if (memStoreSizing != null) {
|
||||
memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Accounting of current heap and data sizes.
|
||||
* <em>NOT THREAD SAFE</em>.
|
||||
* Use in a 'local' context only where just a single-thread is updating. No concurrency!
|
||||
* Used, for example, when summing all Cells in a single batch where result is then applied to the
|
||||
* Store.
|
||||
* @see ThreadSafeMemStoreSizing
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class NonThreadSafeMemStoreSizing implements MemStoreSizing {
|
||||
private long dataSize = 0;
|
||||
private long heapSize = 0;
|
||||
private long offHeapSize = 0;
|
||||
|
||||
NonThreadSafeMemStoreSizing() {
|
||||
this(0, 0, 0);
|
||||
}
|
||||
|
||||
NonThreadSafeMemStoreSizing(MemStoreSize mss) {
|
||||
this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
}
|
||||
|
||||
NonThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
|
||||
incMemStoreSize(dataSize, heapSize, offHeapSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MemStoreSize getMemStoreSize() {
|
||||
return new MemStoreSize(this.dataSize, this.heapSize, this.offHeapSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
|
||||
long offHeapSizeDelta) {
|
||||
this.offHeapSize += offHeapSizeDelta;
|
||||
this.heapSize += heapSizeDelta;
|
||||
this.dataSize += dataSizeDelta;
|
||||
return this.dataSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataSize() {
|
||||
return dataSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHeapSize() {
|
||||
return heapSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOffHeapSize() {
|
||||
return offHeapSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getMemStoreSize().toString();
|
||||
}
|
||||
}
|
|
@ -131,20 +131,20 @@ public class RegionServerAccounting {
|
|||
return this.globalMemStoreOffHeapSize.sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param memStoreSize the Memstore size will be added to
|
||||
* the global Memstore size
|
||||
*/
|
||||
public void incGlobalMemStoreSize(MemStoreSize memStoreSize) {
|
||||
globalMemStoreDataSize.add(memStoreSize.getDataSize());
|
||||
globalMemStoreHeapSize.add(memStoreSize.getHeapSize());
|
||||
globalMemStoreOffHeapSize.add(memStoreSize.getOffHeapSize());
|
||||
void incGlobalMemStoreSize(MemStoreSize mss) {
|
||||
incGlobalMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
}
|
||||
|
||||
public void decGlobalMemStoreSize(MemStoreSize memStoreSize) {
|
||||
globalMemStoreDataSize.add(-memStoreSize.getDataSize());
|
||||
globalMemStoreHeapSize.add(-memStoreSize.getHeapSize());
|
||||
globalMemStoreOffHeapSize.add(-memStoreSize.getOffHeapSize());
|
||||
public void incGlobalMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
globalMemStoreDataSize.add(dataSizeDelta);
|
||||
globalMemStoreHeapSize.add(heapSizeDelta);
|
||||
globalMemStoreOffHeapSize.add(offHeapSizeDelta);
|
||||
}
|
||||
|
||||
public void decGlobalMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
globalMemStoreDataSize.add(-dataSizeDelta);
|
||||
globalMemStoreHeapSize.add(-heapSizeDelta);
|
||||
globalMemStoreOffHeapSize.add(-offHeapSizeDelta);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -231,7 +231,7 @@ public class RegionServerAccounting {
|
|||
// the region open operation. No need to handle multi thread issues on one region's entry in
|
||||
// this Map.
|
||||
if (replayEdistsSize == null) {
|
||||
replayEdistsSize = new MemStoreSizing();
|
||||
replayEdistsSize = new ThreadSafeMemStoreSizing();
|
||||
replayEditsPerRegion.put(regionName, replayEdistsSize);
|
||||
}
|
||||
replayEdistsSize.incMemStoreSize(memStoreSize);
|
||||
|
@ -244,10 +244,11 @@ public class RegionServerAccounting {
|
|||
* @param regionName the region which could not open.
|
||||
*/
|
||||
public void rollbackRegionReplayEditsSize(byte[] regionName) {
|
||||
MemStoreSize replayEditsSize = replayEditsPerRegion.get(regionName);
|
||||
if (replayEditsSize != null) {
|
||||
MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName);
|
||||
if (replayEditsSizing != null) {
|
||||
clearRegionReplayEditsSize(regionName);
|
||||
decGlobalMemStoreSize(replayEditsSize);
|
||||
decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(),
|
||||
replayEditsSizing.getOffHeapSize());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -65,8 +65,8 @@ public class RegionServicesForStores {
|
|||
region.unblockUpdates();
|
||||
}
|
||||
|
||||
public void addMemStoreSize(MemStoreSize size) {
|
||||
region.incMemStoreSize(size);
|
||||
public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
}
|
||||
|
||||
public RegionInfo getRegionInfo() {
|
||||
|
|
|
@ -584,9 +584,6 @@ public class ScannerContext {
|
|||
|
||||
/**
|
||||
* Set all fields together.
|
||||
* @param batch
|
||||
* @param sizeScope
|
||||
* @param dataSize
|
||||
*/
|
||||
void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize,
|
||||
LimitScope timeScope, long time) {
|
||||
|
|
|
@ -45,7 +45,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
* segments from active set to snapshot set in the default implementation.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class Segment {
|
||||
public abstract class Segment implements MemStoreSizing {
|
||||
|
||||
public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
|
||||
+ 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, memStoreSizing,
|
||||
|
@ -59,9 +59,9 @@ public abstract class Segment {
|
|||
private final CellComparator comparator;
|
||||
protected long minSequenceId;
|
||||
private MemStoreLAB memStoreLAB;
|
||||
// Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not
|
||||
// Sum of sizes of all Cells added to this Segment. Cell's HeapSize is considered. This is not
|
||||
// including the heap overhead of this class.
|
||||
protected final MemStoreSizing segmentSize;
|
||||
protected final MemStoreSizing memStoreSizing;
|
||||
protected final TimeRangeTracker timeRangeTracker;
|
||||
protected volatile boolean tagsPresent;
|
||||
|
||||
|
@ -69,7 +69,9 @@ public abstract class Segment {
|
|||
// and there is no need in true Segments state
|
||||
protected Segment(CellComparator comparator, TimeRangeTracker trt) {
|
||||
this.comparator = comparator;
|
||||
this.segmentSize = new MemStoreSizing();
|
||||
// Do we need to be thread safe always? What if ImmutableSegment?
|
||||
// DITTO for the TimeRangeTracker below.
|
||||
this.memStoreSizing = new ThreadSafeMemStoreSizing();
|
||||
this.timeRangeTracker = trt;
|
||||
}
|
||||
|
||||
|
@ -85,7 +87,9 @@ public abstract class Segment {
|
|||
OffHeapSize += memStoreSize.getOffHeapSize();
|
||||
}
|
||||
this.comparator = comparator;
|
||||
this.segmentSize = new MemStoreSizing(dataSize, heapSize, OffHeapSize);
|
||||
// Do we need to be thread safe always? What if ImmutableSegment?
|
||||
// DITTO for the TimeRangeTracker below.
|
||||
this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize);
|
||||
this.timeRangeTracker = trt;
|
||||
}
|
||||
|
||||
|
@ -95,7 +99,9 @@ public abstract class Segment {
|
|||
this.comparator = comparator;
|
||||
this.minSequenceId = Long.MAX_VALUE;
|
||||
this.memStoreLAB = memStoreLAB;
|
||||
this.segmentSize = new MemStoreSizing();
|
||||
// Do we need to be thread safe always? What if ImmutableSegment?
|
||||
// DITTO for the TimeRangeTracker below.
|
||||
this.memStoreSizing = new ThreadSafeMemStoreSizing();
|
||||
this.tagsPresent = false;
|
||||
this.timeRangeTracker = trt;
|
||||
}
|
||||
|
@ -105,7 +111,7 @@ public abstract class Segment {
|
|||
this.comparator = segment.getComparator();
|
||||
this.minSequenceId = segment.getMinSequenceId();
|
||||
this.memStoreLAB = segment.getMemStoreLAB();
|
||||
this.segmentSize = new MemStoreSizing(segment.getMemStoreSize());
|
||||
this.memStoreSizing = new ThreadSafeMemStoreSizing(segment.memStoreSizing.getMemStoreSize());
|
||||
this.tagsPresent = segment.isTagsPresent();
|
||||
this.timeRangeTracker = segment.getTimeRangeTracker();
|
||||
}
|
||||
|
@ -213,39 +219,29 @@ public abstract class Segment {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MemStoreSize getMemStoreSize() {
|
||||
return this.segmentSize;
|
||||
return this.memStoreSizing.getMemStoreSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Sum of all cell's size.
|
||||
*/
|
||||
public long keySize() {
|
||||
return this.segmentSize.getDataSize();
|
||||
@Override
|
||||
public long getDataSize() {
|
||||
return this.memStoreSizing.getDataSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The heap size of this segment.
|
||||
*/
|
||||
public long heapSize() {
|
||||
return this.segmentSize.getHeapSize();
|
||||
@Override
|
||||
public long getHeapSize() {
|
||||
return this.memStoreSizing.getHeapSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The off-heap size of this segment.
|
||||
*/
|
||||
public long offHeapSize() {
|
||||
return this.segmentSize.getOffHeapSize();
|
||||
@Override
|
||||
public long getOffHeapSize() {
|
||||
return this.memStoreSizing.getOffHeapSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the size counters of the segment by the given delta
|
||||
*/
|
||||
//TODO
|
||||
protected void incSize(long delta, long heapOverhead, long offHeapOverhead) {
|
||||
synchronized (this) {
|
||||
this.segmentSize.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
|
||||
}
|
||||
@Override
|
||||
public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) {
|
||||
return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
|
||||
}
|
||||
|
||||
public long getMinSequenceId() {
|
||||
|
@ -308,7 +304,7 @@ public abstract class Segment {
|
|||
}
|
||||
long heapSize = heapSizeChange(cellToAdd, succ);
|
||||
long offHeapSize = offHeapSizeChange(cellToAdd, succ);
|
||||
incSize(cellSize, heapSize, offHeapSize);
|
||||
incMemStoreSize(cellSize, heapSize, offHeapSize);
|
||||
if (memstoreSizing != null) {
|
||||
memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize);
|
||||
}
|
||||
|
@ -408,8 +404,8 @@ public abstract class Segment {
|
|||
String res = "type=" + this.getClass().getSimpleName() + ", ";
|
||||
res += "empty=" + (isEmpty()? "yes": "no") + ", ";
|
||||
res += "cellCount=" + getCellsCount() + ", ";
|
||||
res += "cellSize=" + keySize() + ", ";
|
||||
res += "totalHeapSize=" + heapSize() + ", ";
|
||||
res += "cellSize=" + getDataSize() + ", ";
|
||||
res += "totalHeapSize=" + getHeapSize() + ", ";
|
||||
res += "min timestamp=" + timeRangeTracker.getMin() + ", ";
|
||||
res += "max timestamp=" + timeRangeTracker.getMax();
|
||||
return res;
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Accounting of current heap and data sizes.
|
||||
* Thread-safe. Many threads can do updates against this single instance.
|
||||
* @see NonThreadSafeMemStoreSizing
|
||||
* @see MemStoreSize
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class ThreadSafeMemStoreSizing implements MemStoreSizing {
|
||||
// We used to tie the update of these thread counters so
|
||||
// they all changed together under one lock. This was
|
||||
// undone. Doesn't seem necessary.
|
||||
private final AtomicLong dataSize = new AtomicLong();
|
||||
private final AtomicLong heapSize = new AtomicLong();
|
||||
private final AtomicLong offHeapSize = new AtomicLong();
|
||||
|
||||
ThreadSafeMemStoreSizing() {
|
||||
this(0, 0, 0);
|
||||
}
|
||||
|
||||
ThreadSafeMemStoreSizing(MemStoreSize mss) {
|
||||
this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
}
|
||||
|
||||
ThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
|
||||
incMemStoreSize(dataSize, heapSize, offHeapSize);
|
||||
}
|
||||
|
||||
public MemStoreSize getMemStoreSize() {
|
||||
return new MemStoreSize(getDataSize(), getHeapSize(), getOffHeapSize());
|
||||
}
|
||||
|
||||
public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
this.offHeapSize.addAndGet(offHeapSizeDelta);
|
||||
this.heapSize.addAndGet(heapSizeDelta);
|
||||
return this.dataSize.addAndGet(dataSizeDelta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataSize() {
|
||||
return dataSize.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHeapSize() {
|
||||
return heapSize.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOffHeapSize() {
|
||||
return offHeapSize.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getMemStoreSize().toString();
|
||||
}
|
||||
}
|
|
@ -355,7 +355,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
private long runSnapshot(final AbstractMemStore hmc, boolean useForce)
|
||||
throws IOException {
|
||||
// Save off old state.
|
||||
long oldHistorySize = hmc.getSnapshot().keySize();
|
||||
long oldHistorySize = hmc.getSnapshot().getDataSize();
|
||||
long prevTimeStamp = hmc.timeOfOldestEdit();
|
||||
|
||||
hmc.snapshot();
|
||||
|
@ -616,9 +616,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
|
||||
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
|
||||
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss);
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(4, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -667,7 +668,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
|
||||
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
|
||||
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -675,9 +676,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
+ 7 * oneCellOnCAHeapSize;
|
||||
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
|
||||
|
||||
size = memstore.getFlushableSize();
|
||||
mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(7, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -722,7 +724,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
((MyCompactingMemStore) memstore).disableCompaction();
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
|
||||
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||
// No change in the cells data size. ie. memstore size. as there is no compaction.
|
||||
|
@ -738,7 +740,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
((MyCompactingMemStore)memstore).enableCompaction();
|
||||
size = memstore.getFlushableSize();
|
||||
mss = memstore.getFlushableSize();
|
||||
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
|
||||
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||
// active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
|
||||
|
@ -751,9 +753,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
|
||||
+ CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
size = memstore.getFlushableSize();
|
||||
mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(4, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -811,9 +814,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
assertTrue(4 == numCells || 11 == numCells);
|
||||
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss);
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
numCells = s.getCellsCount();
|
||||
assertTrue(4 == numCells || 11 == numCells);
|
||||
|
@ -825,8 +829,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
|
||||
byte[] fam = Bytes.toBytes("testfamily");
|
||||
byte[] qf = Bytes.toBytes("testqualifier");
|
||||
long size = hmc.getActive().keySize();
|
||||
long heapOverhead = hmc.getActive().heapSize();
|
||||
long size = hmc.getActive().getDataSize();
|
||||
long heapOverhead = hmc.getActive().getHeapSize();
|
||||
int totalLen = 0;
|
||||
for (int i = 0; i < keys.length; i++) {
|
||||
long timestamp = System.currentTimeMillis();
|
||||
|
@ -838,8 +842,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
hmc.add(kv, null);
|
||||
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
|
||||
}
|
||||
regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size,
|
||||
hmc.getActive().heapSize() - heapOverhead, 0));
|
||||
regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
|
||||
hmc.getActive().getHeapSize() - heapOverhead, 0);
|
||||
return totalLen;
|
||||
}
|
||||
|
||||
|
@ -847,8 +851,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
|
||||
byte[] fam = Bytes.toBytes("testfamily");
|
||||
byte[] qf = Bytes.toBytes("testqualifier");
|
||||
long size = hmc.getActive().keySize();
|
||||
long heapOverhead = hmc.getActive().heapSize();
|
||||
long size = hmc.getActive().getDataSize();
|
||||
long heapOverhead = hmc.getActive().getHeapSize();
|
||||
int totalLen = 0;
|
||||
for (int i = 0; i < keys.length; i++) {
|
||||
long timestamp = System.currentTimeMillis();
|
||||
|
@ -859,8 +863,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
hmc.add(kv, null);
|
||||
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
|
||||
}
|
||||
regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size,
|
||||
hmc.getActive().heapSize() - heapOverhead, 0));
|
||||
regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
|
||||
hmc.getActive().getHeapSize() - heapOverhead, 0);
|
||||
return totalLen;
|
||||
}
|
||||
|
||||
|
|
|
@ -132,9 +132,9 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
counter += s.getCellsCount();
|
||||
}
|
||||
assertEquals(3, counter);
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
region.decrMemStoreSize(mss); // simulate flusher
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(3, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -194,9 +194,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
totalHeapSize2 = 1 * cellAfterFlushSize;
|
||||
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss);
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(4, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -224,7 +225,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
assertEquals(totalCellsLen1, region.getMemStoreDataSize());
|
||||
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
|
||||
|
||||
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||
|
@ -245,7 +246,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
((MyCompactingMemStore) memstore).disableCompaction();
|
||||
size = memstore.getFlushableSize();
|
||||
mss = memstore.getFlushableSize();
|
||||
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
|
||||
totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
|
||||
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||
|
@ -260,7 +261,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
((MyCompactingMemStore) memstore).enableCompaction();
|
||||
size = memstore.getFlushableSize();
|
||||
mss = memstore.getFlushableSize();
|
||||
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
|
||||
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
|
||||
Threads.sleep(10);
|
||||
|
@ -279,9 +280,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
|
||||
assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
size = memstore.getFlushableSize();
|
||||
mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(4, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -655,9 +657,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
|
||||
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss);
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(numOfCells, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -727,9 +730,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
|
||||
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss);
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(numOfCells, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -801,9 +805,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
|
||||
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
MemStoreSize size = memstore.getFlushableSize();
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||
region.decrMemStoreSize(size); // simulate flusher
|
||||
// simulate flusher
|
||||
region.decrMemStoreSize(mss);
|
||||
ImmutableSegment s = memstore.getSnapshot();
|
||||
assertEquals(numOfCells, s.getCellsCount());
|
||||
assertEquals(0, regionServicesForStores.getMemStoreSize());
|
||||
|
@ -895,7 +900,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
|
||||
byte[] fam = Bytes.toBytes("testfamily");
|
||||
byte[] qf = Bytes.toBytes("testqualifier");
|
||||
MemStoreSizing memstoreSizing = new MemStoreSizing();
|
||||
MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
for (int i = 0; i < keys.length; i++) {
|
||||
long timestamp = System.currentTimeMillis();
|
||||
Threads.sleep(1); // to make sure each kv gets a different ts
|
||||
|
@ -905,8 +910,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
hmc.add(kv, memstoreSizing);
|
||||
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
|
||||
}
|
||||
regionServicesForStores.addMemStoreSize(memstoreSizing);
|
||||
return memstoreSizing.getDataSize();
|
||||
MemStoreSize mss = memstoreSizing.getMemStoreSize();
|
||||
regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(),
|
||||
mss.getOffHeapSize());
|
||||
return mss.getDataSize();
|
||||
}
|
||||
|
||||
private long cellBeforeFlushSize() {
|
||||
|
|
|
@ -129,17 +129,18 @@ public class TestDefaultMemStore {
|
|||
public void testPutSameCell() {
|
||||
byte[] bytes = Bytes.toBytes(getName());
|
||||
KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
|
||||
MemStoreSizing sizeChangeForFirstCell = new MemStoreSizing();
|
||||
MemStoreSizing sizeChangeForFirstCell = new NonThreadSafeMemStoreSizing();
|
||||
this.memstore.add(kv, sizeChangeForFirstCell);
|
||||
MemStoreSizing sizeChangeForSecondCell = new MemStoreSizing();
|
||||
MemStoreSizing sizeChangeForSecondCell = new NonThreadSafeMemStoreSizing();
|
||||
this.memstore.add(kv, sizeChangeForSecondCell);
|
||||
// make sure memstore size increase won't double-count MSLAB chunk size
|
||||
assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getDataSize());
|
||||
assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getMemStoreSize().getDataSize());
|
||||
Segment segment = this.memstore.getActive();
|
||||
MemStoreLAB msLab = segment.getMemStoreLAB();
|
||||
if (msLab != null) {
|
||||
// make sure memstore size increased even when writing the same cell, if using MSLAB
|
||||
assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize());
|
||||
assertEquals(Segment.getCellLength(kv),
|
||||
sizeChangeForSecondCell.getMemStoreSize().getDataSize());
|
||||
// make sure chunk size increased even when writing the same cell, if using MSLAB
|
||||
if (msLab instanceof MemStoreLABImpl) {
|
||||
// since we add the chunkID at the 0th offset of the chunk and the
|
||||
|
@ -149,8 +150,8 @@ public class TestDefaultMemStore {
|
|||
}
|
||||
} else {
|
||||
// make sure no memstore size change w/o MSLAB
|
||||
assertEquals(0, sizeChangeForSecondCell.getDataSize());
|
||||
assertEquals(0, sizeChangeForSecondCell.getHeapSize());
|
||||
assertEquals(0, sizeChangeForSecondCell.getMemStoreSize().getDataSize());
|
||||
assertEquals(0, sizeChangeForSecondCell.getMemStoreSize().getHeapSize());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -361,8 +361,9 @@ public class TestHRegion {
|
|||
} finally {
|
||||
assertTrue("The regionserver should have thrown an exception", threwIOE);
|
||||
}
|
||||
long sz = store.getFlushableSize().getDataSize();
|
||||
assertTrue("flushable size should be zero, but it is " + sz, sz == 0);
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
assertTrue("flushable size should be zero, but it is " + mss,
|
||||
mss.getDataSize() == 0);
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
}
|
||||
|
||||
|
@ -414,9 +415,10 @@ public class TestHRegion {
|
|||
} catch (IOException expected) {
|
||||
}
|
||||
long expectedSize = onePutSize * 2;
|
||||
assertEquals("memstoreSize should be incremented", expectedSize, region.getMemStoreDataSize());
|
||||
assertEquals("flushable size should be incremented", expectedSize,
|
||||
store.getFlushableSize().getDataSize());
|
||||
assertEquals("memstoreSize should be incremented",
|
||||
expectedSize, region.getMemStoreDataSize());
|
||||
assertEquals("flushable size should be incremented",
|
||||
expectedSize, store.getFlushableSize().getDataSize());
|
||||
|
||||
region.setCoprocessorHost(null);
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
|
|
|
@ -367,7 +367,7 @@ public class TestHRegionReplayEvents {
|
|||
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
|
||||
long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
|
||||
long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
|
||||
long storeFlushableSize = store.getFlushableSize().getHeapSize();
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
long storeSize = store.getSize();
|
||||
long storeSizeUncompressed = store.getStoreSizeUncompressed();
|
||||
if (flushDesc.getAction() == FlushAction.START_FLUSH) {
|
||||
|
@ -391,8 +391,8 @@ public class TestHRegionReplayEvents {
|
|||
for (HStore s : secondaryRegion.getStores()) {
|
||||
assertEquals(expectedStoreFileCount, s.getStorefilesCount());
|
||||
}
|
||||
long newFlushableSize = store.getFlushableSize().getHeapSize();
|
||||
assertTrue(storeFlushableSize > newFlushableSize);
|
||||
MemStoreSize newMss = store.getFlushableSize();
|
||||
assertTrue(mss.getHeapSize() > newMss.getHeapSize());
|
||||
|
||||
// assert that the region memstore is smaller now
|
||||
long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
|
||||
|
@ -466,7 +466,7 @@ public class TestHRegionReplayEvents {
|
|||
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
|
||||
long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
|
||||
long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
|
||||
long storeFlushableSize = store.getFlushableSize().getHeapSize();
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
|
||||
if (flushDesc.getAction() == FlushAction.START_FLUSH) {
|
||||
startFlushDesc = flushDesc;
|
||||
|
@ -475,7 +475,7 @@ public class TestHRegionReplayEvents {
|
|||
assertNull(result.result);
|
||||
assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
|
||||
assertTrue(regionMemstoreSize > 0);
|
||||
assertTrue(storeFlushableSize > 0);
|
||||
assertTrue(mss.getHeapSize() > 0);
|
||||
|
||||
// assert that the store memstore is smaller now
|
||||
long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
|
||||
|
@ -616,8 +616,8 @@ public class TestHRegionReplayEvents {
|
|||
assertEquals(expectedStoreFileCount, s.getStorefilesCount());
|
||||
}
|
||||
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
|
||||
long newFlushableSize = store.getFlushableSize().getHeapSize();
|
||||
assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
|
||||
|
||||
// assert that the region memstore is same as before
|
||||
long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
|
||||
|
@ -706,8 +706,8 @@ public class TestHRegionReplayEvents {
|
|||
assertEquals(expectedStoreFileCount, s.getStorefilesCount());
|
||||
}
|
||||
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
|
||||
long newFlushableSize = store.getFlushableSize().getHeapSize();
|
||||
assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
|
||||
|
||||
// assert that the region memstore is smaller than before, but not empty
|
||||
long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
|
||||
|
@ -811,12 +811,12 @@ public class TestHRegionReplayEvents {
|
|||
assertEquals(expectedStoreFileCount, s.getStorefilesCount());
|
||||
}
|
||||
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
|
||||
long newFlushableSize = store.getFlushableSize().getHeapSize();
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
if (droppableMemstore) {
|
||||
// assert that the memstore is dropped
|
||||
assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD);
|
||||
assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
|
||||
} else {
|
||||
assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
|
||||
assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
|
||||
}
|
||||
|
||||
// assert that the region memstore is same as before (we could not drop)
|
||||
|
@ -903,8 +903,8 @@ public class TestHRegionReplayEvents {
|
|||
assertEquals(expectedStoreFileCount, s.getStorefilesCount());
|
||||
}
|
||||
Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
|
||||
long newFlushableSize = store.getFlushableSize().getHeapSize();
|
||||
assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD);
|
||||
MemStoreSize mss = store.getFlushableSize();
|
||||
assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
|
||||
|
||||
// assert that the region memstore is empty
|
||||
long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
@Category({VerySlowRegionServerTests.class, LargeTests.class})
|
||||
@SuppressWarnings("deprecation")
|
||||
public class TestHRegionWithInMemoryFlush extends TestHRegion{
|
||||
public class TestHRegionWithInMemoryFlush extends TestHRegion {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
|
|
|
@ -239,8 +239,7 @@ public class TestHStore {
|
|||
*/
|
||||
@Test
|
||||
public void testFlushSizeSizing() throws Exception {
|
||||
LOG.info("Setting up a faulty file system that cannot write in " +
|
||||
this.name.getMethodName());
|
||||
LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
|
||||
final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||
// Only retry once.
|
||||
conf.setInt("hbase.hstore.flush.retries.number", 1);
|
||||
|
@ -259,15 +258,15 @@ public class TestHStore {
|
|||
// Initialize region
|
||||
init(name.getMethodName(), conf);
|
||||
|
||||
MemStoreSize size = store.memstore.getFlushableSize();
|
||||
assertEquals(0, size.getDataSize());
|
||||
MemStoreSize mss = store.memstore.getFlushableSize();
|
||||
assertEquals(0, mss.getDataSize());
|
||||
LOG.info("Adding some data");
|
||||
MemStoreSizing kvSize = new MemStoreSizing();
|
||||
MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
|
||||
store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
|
||||
// add the heap size of active (mutable) segment
|
||||
kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0);
|
||||
size = store.memstore.getFlushableSize();
|
||||
assertEquals(kvSize, size);
|
||||
mss = store.memstore.getFlushableSize();
|
||||
assertEquals(kvSize.getMemStoreSize(), mss);
|
||||
// Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
|
||||
try {
|
||||
LOG.info("Flushing");
|
||||
|
@ -279,23 +278,23 @@ public class TestHStore {
|
|||
// due to snapshot, change mutable to immutable segment
|
||||
kvSize.incMemStoreSize(0,
|
||||
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD, 0);
|
||||
size = store.memstore.getFlushableSize();
|
||||
assertEquals(kvSize, size);
|
||||
MemStoreSizing kvSize2 = new MemStoreSizing();
|
||||
mss = store.memstore.getFlushableSize();
|
||||
assertEquals(kvSize.getMemStoreSize(), mss);
|
||||
MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
|
||||
store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
|
||||
kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0);
|
||||
// Even though we add a new kv, we expect the flushable size to be 'same' since we have
|
||||
// not yet cleared the snapshot -- the above flush failed.
|
||||
assertEquals(kvSize, size);
|
||||
assertEquals(kvSize.getMemStoreSize(), mss);
|
||||
ffs.fault.set(false);
|
||||
flushStore(store, id++);
|
||||
size = store.memstore.getFlushableSize();
|
||||
mss = store.memstore.getFlushableSize();
|
||||
// Size should be the foreground kv size.
|
||||
assertEquals(kvSize2, size);
|
||||
assertEquals(kvSize2.getMemStoreSize(), mss);
|
||||
flushStore(store, id++);
|
||||
size = store.memstore.getFlushableSize();
|
||||
assertEquals(0, size.getDataSize());
|
||||
assertEquals(MutableSegment.DEEP_OVERHEAD, size.getHeapSize());
|
||||
mss = store.memstore.getFlushableSize();
|
||||
assertEquals(0, mss.getDataSize());
|
||||
assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
@ -1225,7 +1224,7 @@ public class TestHStore {
|
|||
byte[] value0 = Bytes.toBytes("value0");
|
||||
byte[] value1 = Bytes.toBytes("value1");
|
||||
byte[] value2 = Bytes.toBytes("value2");
|
||||
MemStoreSizing memStoreSizing = new MemStoreSizing();
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
long seqId = 100;
|
||||
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
|
||||
|
@ -1284,7 +1283,7 @@ public class TestHStore {
|
|||
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
||||
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
||||
byte[] value = Bytes.toBytes("value");
|
||||
MemStoreSizing memStoreSizing = new MemStoreSizing();
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
long seqId = 100;
|
||||
// older data whihc shouldn't be "seen" by client
|
||||
|
@ -1362,7 +1361,7 @@ public class TestHStore {
|
|||
});
|
||||
byte[] oldValue = Bytes.toBytes("oldValue");
|
||||
byte[] currentValue = Bytes.toBytes("currentValue");
|
||||
MemStoreSizing memStoreSizing = new MemStoreSizing();
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
long seqId = 100;
|
||||
// older data whihc shouldn't be "seen" by client
|
||||
|
@ -1478,7 +1477,7 @@ public class TestHStore {
|
|||
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
||||
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
||||
byte[] value = Bytes.toBytes("thisisavarylargevalue");
|
||||
MemStoreSizing memStoreSizing = new MemStoreSizing();
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
long seqId = 100;
|
||||
// older data whihc shouldn't be "seen" by client
|
||||
|
@ -1600,7 +1599,7 @@ public class TestHStore {
|
|||
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
|
||||
// Set the lower threshold to invoke the "MERGE" policy
|
||||
MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
|
||||
MemStoreSizing memStoreSizing = new MemStoreSizing();
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
long ts = System.currentTimeMillis();
|
||||
long seqID = 1L;
|
||||
// Add some data to the region and do some flushes
|
||||
|
|
Loading…
Reference in New Issue