diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 9f4fd2f8c4b..b82afba93d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -165,13 +165,7 @@ public abstract class AbstractMemStore implements MemStore { @Override public MemStoreSize getSnapshotSize() { - return getSnapshotSizing(); - } - - MemStoreSizing getSnapshotSizing() { - return new MemStoreSizing(this.snapshot.keySize(), - this.snapshot.heapSize(), - this.snapshot.offHeapSize()); + return this.snapshot.getMemStoreSize(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java index 6af84cb5ba4..9e206ea67e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java @@ -40,7 +40,7 @@ public class CSLMImmutableSegment extends ImmutableSegment { super(segment); // update the segment metadata heap size long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM; - incSize(0, indexOverhead, 0); // CSLM is always on-heap + incMemStoreSize(0, indexOverhead, 0); // CSLM is always on-heap } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java index 46312002634..dadfc48633a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java @@ -45,7 +45,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment { protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) { super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL - incSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap + incMemStoreSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment initializeCellSet(numOfCells, iterator, action); } @@ -59,7 +59,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment { MemStoreCompactionStrategy.Action action) { super(segment); // initiailize the upper class long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; - incSize(0, indexOverhead, 0); // CAM is always on-heap + incMemStoreSize(0, indexOverhead, 0); // CAM is always on-heap int numOfCells = segment.getCellsCount(); // build the new CellSet based on CellChunkMap and update the CellSet of this Segment reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(), @@ -67,7 +67,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment { // arrange the meta-data size, decrease all meta-data sizes related to SkipList; // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); - incSize(0, newSegmentSizeDelta, 0); + incMemStoreSize(0, newSegmentSizeDelta, 0); memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java index 0eebfb54586..e2f82051c75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java @@ -58,9 +58,9 @@ public class CellChunkImmutableSegment extends ImmutableSegment { boolean onHeap = getMemStoreLAB().isOnHeap(); // initiate the heapSize with the size of the segment metadata if(onHeap) { - incSize(0, indexOverhead, 0); + incMemStoreSize(0, indexOverhead, 0); } else { - incSize(0, 0, indexOverhead); + incMemStoreSize(0, 0, indexOverhead); } // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment initializeCellSet(numOfCells, iterator, action); @@ -79,9 +79,9 @@ public class CellChunkImmutableSegment extends ImmutableSegment { boolean onHeap = getMemStoreLAB().isOnHeap(); // initiate the heapSize with the size of the segment metadata if(onHeap) { - incSize(0, indexOverhead, 0); + incMemStoreSize(0, indexOverhead, 0); } else { - incSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM); + incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM); } int numOfCells = segment.getCellsCount(); // build the new CellSet based on CellChunkMap @@ -92,10 +92,10 @@ public class CellChunkImmutableSegment extends ImmutableSegment { // (reinitializeCellSet doesn't take the care for the sizes) long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); if(onHeap) { - incSize(0, newSegmentSizeDelta, 0); + incMemStoreSize(0, newSegmentSizeDelta, 0); memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0); } else { - incSize(0, 0, newSegmentSizeDelta); + incMemStoreSize(0, 0, newSegmentSizeDelta); memstoreSizing.incMemStoreSize(0, 0, newSegmentSizeDelta); } @@ -333,7 +333,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { long heapOverhead = newHeapSize - oldHeapSize; long offHeapOverhead = newOffHeapSize - oldOffHeapSize; //TODO: maybe need to update the dataSize of the region - incSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead); + incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead); return cell; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index c1b061a01c0..fc3e5ba6692 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -155,12 +155,12 @@ public class CompactingMemStore extends AbstractMemStore { */ @Override public MemStoreSize size() { - MemStoreSizing memstoreSizing = new MemStoreSizing(); + MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); memstoreSizing.incMemStoreSize(active.getMemStoreSize()); for (Segment item : pipeline.getSegments()) { memstoreSizing.incMemStoreSize(item.getMemStoreSize()); } - return memstoreSizing; + return memstoreSizing.getMemStoreSize(); } /** @@ -216,42 +216,38 @@ public class CompactingMemStore extends AbstractMemStore { return new MemStoreSnapshot(snapshotId, this.snapshot); } - /** - * On flush, how much memory we will clear. - * @return size of data that is going to be flushed - */ @Override public MemStoreSize getFlushableSize() { - MemStoreSizing snapshotSizing = getSnapshotSizing(); - if (snapshotSizing.getDataSize() == 0) { + MemStoreSize mss = getSnapshotSize(); + if (mss.getDataSize() == 0) { // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed if (compositeSnapshot) { - snapshotSizing = pipeline.getPipelineSizing(); - snapshotSizing.incMemStoreSize(active.getMemStoreSize()); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize()); + memStoreSizing.incMemStoreSize(this.active.getMemStoreSize()); + mss = memStoreSizing.getMemStoreSize(); } else { - snapshotSizing = pipeline.getTailSizing(); + mss = pipeline.getTailSize(); } } - return snapshotSizing.getDataSize() > 0 ? snapshotSizing - : new MemStoreSize(active.getMemStoreSize()); + return mss.getDataSize() > 0? mss: this.active.getMemStoreSize(); } @Override protected long keySize() { - // Need to consider keySize of all segments in pipeline and active - long k = this.active.keySize(); + // Need to consider dataSize/keySize of all segments in pipeline and active + long keySize = this.active.getDataSize(); for (Segment segment : this.pipeline.getSegments()) { - k += segment.keySize(); + keySize += segment.getDataSize(); } - return k; + return keySize; } @Override protected long heapSize() { // Need to consider heapOverhead of all segments in pipeline and active - long h = this.active.heapSize(); + long h = this.active.getHeapSize(); for (Segment segment : this.pipeline.getSegments()) { - h += segment.heapSize(); + h += segment.getHeapSize(); } return h; } @@ -447,7 +443,7 @@ public class CompactingMemStore extends AbstractMemStore { @VisibleForTesting protected boolean shouldFlushInMemory() { - if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold + if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush return false; // regardless the size } @@ -571,7 +567,7 @@ public class CompactingMemStore extends AbstractMemStore { // debug method public void debug() { - String msg = "active size=" + this.active.keySize(); + String msg = "active size=" + this.active.getDataSize(); msg += " in-memory flush size is "+ inmemoryFlushSize; msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 907df72ee16..f8aa3ef73fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -23,11 +23,12 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; /** * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. @@ -135,19 +136,21 @@ public class CompactionPipeline { // update the global memstore size counter long suffixDataSize = getSegmentsKeySize(suffix); long newDataSize = 0; - if(segment != null) newDataSize = segment.keySize(); + if(segment != null) { + newDataSize = segment.getDataSize(); + } long dataSizeDelta = suffixDataSize - newDataSize; long suffixHeapSize = getSegmentsHeapSize(suffix); long suffixOffHeapSize = getSegmentsOffHeapSize(suffix); long newHeapSize = 0; long newOffHeapSize = 0; if(segment != null) { - newHeapSize = segment.heapSize(); - newOffHeapSize = segment.offHeapSize(); + newHeapSize = segment.getHeapSize(); + newOffHeapSize = segment.getOffHeapSize(); } long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize; long heapSizeDelta = suffixHeapSize - newHeapSize; - region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta)); + region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta); LOG.debug("Suffix data size={}, new segment data size={}, " + "suffix heap size={}," + "new segment heap size={}" + "suffix off heap size={}," + "new segment off heap size={}" @@ -164,7 +167,7 @@ public class CompactionPipeline { private static long getSegmentsHeapSize(List list) { long res = 0; for (Segment segment : list) { - res += segment.heapSize(); + res += segment.getHeapSize(); } return res; } @@ -172,7 +175,7 @@ public class CompactionPipeline { private static long getSegmentsOffHeapSize(List list) { long res = 0; for (Segment segment : list) { - res += segment.offHeapSize(); + res += segment.getOffHeapSize(); } return res; } @@ -180,7 +183,7 @@ public class CompactionPipeline { private static long getSegmentsKeySize(List list) { long res = 0; for (Segment segment : list) { - res += segment.keySize(); + res += segment.getDataSize(); } return res; } @@ -211,15 +214,17 @@ public class CompactionPipeline { int i = 0; for (ImmutableSegment s : pipeline) { if ( s.canBeFlattened() ) { - MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated + // size to be updated + MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing(); ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action); replaceAtIndex(i,newS); if(region != null) { - // update the global memstore size counter - // upon flattening there is no change in the data size - region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize(), - newMemstoreAccounting.getOffHeapSize())); + // Update the global memstore size counter upon flattening there is no change in the + // data size + MemStoreSize mss = newMemstoreAccounting.getMemStoreSize(); + Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!"); + region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); } LOG.debug("Compaction pipeline segment {} flattened", s); return true; @@ -254,19 +259,18 @@ public class CompactionPipeline { return minSequenceId; } - public MemStoreSizing getTailSizing() { + public MemStoreSize getTailSize() { LinkedList localCopy = readOnlyCopy; - if (localCopy.isEmpty()) return new MemStoreSizing(); - return new MemStoreSizing(localCopy.peekLast().getMemStoreSize()); + return localCopy.isEmpty()? new MemStoreSize(): localCopy.peekLast().getMemStoreSize(); } - public MemStoreSizing getPipelineSizing() { - MemStoreSizing memStoreSizing = new MemStoreSizing(); + public MemStoreSize getPipelineSize() { + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); LinkedList localCopy = readOnlyCopy; for (Segment segment : localCopy) { memStoreSizing.incMemStoreSize(segment.getMemStoreSize()); } - return memStoreSizing; + return memStoreSizing.getMemStoreSize(); } private void swapSuffix(List suffix, ImmutableSegment segment, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index 98e09814925..1fd2f23a3a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -48,7 +48,7 @@ public class CompositeImmutableSegment extends ImmutableSegment { for (ImmutableSegment s : segments) { this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax()); this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin()); - this.keySize += s.keySize(); + this.keySize += s.getDataSize(); } } @@ -170,7 +170,7 @@ public class CompositeImmutableSegment extends ImmutableSegment { * @return Sum of all cell sizes. */ @Override - public long keySize() { + public long getDataSize() { return this.keySize; } @@ -178,10 +178,10 @@ public class CompositeImmutableSegment extends ImmutableSegment { * @return The heap size of this segment. */ @Override - public long heapSize() { + public long getHeapSize() { long result = 0; for (ImmutableSegment s : segments) { - result += s.heapSize(); + result += s.getHeapSize(); } return result; } @@ -190,7 +190,7 @@ public class CompositeImmutableSegment extends ImmutableSegment { * Updates the heap size counter of the segment by the given delta */ @Override - protected void incSize(long delta, long heapOverhead, long offHeapOverhead) { + public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index ddeaddf2304..5dcf48bb7eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -97,26 +97,20 @@ public class DefaultMemStore extends AbstractMemStore { return new MemStoreSnapshot(this.snapshotId, this.snapshot); } - /** - * On flush, how much memory we will clear from the active cell set. - * - * @return size of data that is going to be flushed from active set - */ @Override public MemStoreSize getFlushableSize() { - MemStoreSize snapshotSize = getSnapshotSize(); - return snapshotSize.getDataSize() > 0 ? snapshotSize - : new MemStoreSize(active.getMemStoreSize()); + MemStoreSize mss = getSnapshotSize(); + return mss.getDataSize() > 0? mss: this.active.getMemStoreSize(); } @Override protected long keySize() { - return this.active.keySize(); + return this.active.getDataSize(); } @Override protected long heapSize() { - return this.active.heapSize(); + return this.active.getHeapSize(); } @Override @@ -154,7 +148,7 @@ public class DefaultMemStore extends AbstractMemStore { @Override public MemStoreSize size() { - return new MemStoreSize(active.getMemStoreSize()); + return active.getMemStoreSize(); } /** @@ -193,26 +187,27 @@ public class DefaultMemStore extends AbstractMemStore { byte [] fam = Bytes.toBytes("col"); byte [] qf = Bytes.toBytes("umn"); byte [] empty = new byte[0]; - MemStoreSizing memstoreSizing = new MemStoreSizing(); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); for (int i = 0; i < count; i++) { // Give each its own ts - memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing); + memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); } - LOG.info("memstore1 estimated size=" - + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize())); + LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() + + memStoreSizing.getMemStoreSize().getHeapSize()); for (int i = 0; i < count; i++) { - memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing); + memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); } - LOG.info("memstore1 estimated size (2nd loading of same data)=" - + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize())); + LOG.info("memstore1 estimated size (2nd loading of same data)={}", + memStoreSizing.getMemStoreSize().getDataSize() + + memStoreSizing.getMemStoreSize().getHeapSize()); // Make a variably sized memstore. DefaultMemStore memstore2 = new DefaultMemStore(); - memstoreSizing = new MemStoreSizing(); + memStoreSizing = new NonThreadSafeMemStoreSizing(); for (int i = 0; i < count; i++) { - memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSizing); + memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing); } - LOG.info("memstore2 estimated size=" - + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize())); + LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() + + memStoreSizing.getMemStoreSize().getHeapSize()); final int seconds = 30; LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); LOG.info("Exiting."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1fb6afe7ec8..49263984057 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -292,7 +292,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private Map coprocessorServiceHandlers = Maps.newHashMap(); // Track data size in all memstores - private final MemStoreSizing memStoreSize = new MemStoreSizing(); + private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this); // Debug possible data loss due to WAL off @@ -1210,36 +1210,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Increase the size of mem store in this region and the size of global mem * store */ - public void incMemStoreSize(MemStoreSize memStoreSize) { - if (this.rsAccounting != null) { - rsAccounting.incGlobalMemStoreSize(memStoreSize); - } - long dataSize; - synchronized (this.memStoreSize) { - this.memStoreSize.incMemStoreSize(memStoreSize); - dataSize = this.memStoreSize.getDataSize(); - } - checkNegativeMemStoreDataSize(dataSize, memStoreSize.getDataSize()); + void incMemStoreSize(MemStoreSize mss) { + incMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); } - public void decrMemStoreSize(MemStoreSize memStoreSize) { + void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { if (this.rsAccounting != null) { - rsAccounting.decGlobalMemStoreSize(memStoreSize); + rsAccounting.incGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta); } - long size; - synchronized (this.memStoreSize) { - this.memStoreSize.decMemStoreSize(memStoreSize); - size = this.memStoreSize.getDataSize(); + long dataSize = + this.memStoreSizing.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta); + checkNegativeMemStoreDataSize(dataSize, dataSizeDelta); + } + + void decrMemStoreSize(MemStoreSize mss) { + decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); + } + + void decrMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { + if (this.rsAccounting != null) { + rsAccounting.decGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta); } - checkNegativeMemStoreDataSize(size, -memStoreSize.getDataSize()); + long dataSize = + this.memStoreSizing.decMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta); + checkNegativeMemStoreDataSize(dataSize, -dataSizeDelta); } private void checkNegativeMemStoreDataSize(long memStoreDataSize, long delta) { - // This is extremely bad if we make memStoreSize negative. Log as much info on the offending - // caller as possible. (memStoreSize might be a negative value already -- freeing memory) + // This is extremely bad if we make memStoreSizing negative. Log as much info on the offending + // caller as possible. (memStoreSizing might be a negative value already -- freeing memory) if (memStoreDataSize < 0) { LOG.error("Asked to modify this region's (" + this.toString() - + ") memStoreSize to a negative value which is incorrect. Current memStoreSize=" + + ") memStoreSizing to a negative value which is incorrect. Current memStoreSizing=" + (memStoreDataSize - delta) + ", delta=" + delta, new Exception()); } } @@ -1274,17 +1276,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public long getMemStoreDataSize() { - return memStoreSize.getDataSize(); + return memStoreSizing.getDataSize(); } @Override public long getMemStoreHeapSize() { - return memStoreSize.getHeapSize(); + return memStoreSizing.getHeapSize(); } @Override public long getMemStoreOffHeapSize() { - return memStoreSize.getOffHeapSize(); + return memStoreSizing.getOffHeapSize(); } /** @return store services for this region, to access services required by store level needs */ @@ -1555,7 +1557,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int failedfFlushCount = 0; int flushCount = 0; long tmp = 0; - long remainingSize = this.memStoreSize.getDataSize(); + long remainingSize = this.memStoreSizing.getDataSize(); while (remainingSize > 0) { try { internalFlushcache(status); @@ -1564,7 +1566,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi " (carrying snapshot?) " + this); } flushCount++; - tmp = this.memStoreSize.getDataSize(); + tmp = this.memStoreSizing.getDataSize(); if (tmp >= remainingSize) { failedfFlushCount++; } @@ -1598,13 +1600,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // close each store in parallel for (HStore store : stores.values()) { - MemStoreSize flushableSize = store.getFlushableSize(); - if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) { + MemStoreSize mss = store.getFlushableSize(); + if (!(abort || mss.getDataSize() == 0 || writestate.readOnly)) { if (getRegionServerServices() != null) { getRegionServerServices().abort("Assertion failed while closing store " + getRegionInfo().getRegionNameAsString() + " " + store - + ". flushableSize expected=0, actual= " + flushableSize - + ". Current memStoreSize=" + getMemStoreDataSize() + ". Maybe a coprocessor " + + ". flushableSize expected=0, actual={" + mss + + "}. Current memStoreSize=" + this.memStoreSizing.getMemStoreSize() + + ". Maybe a coprocessor " + "operation failed and left the memstore in a partially updated state.", null); } } @@ -1647,9 +1650,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.closed.set(true); if (!canFlush) { - this.decrMemStoreSize(new MemStoreSize(memStoreSize)); - } else if (memStoreSize.getDataSize() != 0) { - LOG.error("Memstore data size is " + memStoreSize.getDataSize()); + decrMemStoreSize(this.memStoreSizing.getMemStoreSize()); + } else if (this.memStoreSizing.getDataSize() != 0) { + LOG.error("Memstore data size is {}", this.memStoreSizing.getDataSize()); } if (coprocessorHost != null) { status.setStatus("Running coprocessor post-close hooks"); @@ -1782,7 +1785,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return True if its worth doing a flush before we put up the close flag. */ private boolean worthPreFlushing() { - return this.memStoreSize.getDataSize() > + return this.memStoreSizing.getDataSize() > this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5); } @@ -2400,12 +2403,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs // to no other that it can use to associate with the bulk load. Hence this little dance below // to go get one. - if (this.memStoreSize.getDataSize() <= 0) { + if (this.memStoreSizing.getDataSize() <= 0) { // Take an update lock so no edits can come into memory just yet. this.updatesLock.writeLock().lock(); WriteEntry writeEntry = null; try { - if (this.memStoreSize.getDataSize() <= 0) { + if (this.memStoreSizing.getDataSize() <= 0) { // Presume that if there are still no edits in the memstore, then there are no edits for // this region out in the WAL subsystem so no need to do any trickery clearing out // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for @@ -2447,7 +2450,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName()); - MemStoreSizing totalSizeOfFlushableStores = new MemStoreSizing(); + MemStoreSizing totalSizeOfFlushableStores = new NonThreadSafeMemStoreSizing(); Map flushedFamilyNamesToSeq = new HashMap<>(); for (HStore store : storesToFlush) { @@ -2536,14 +2539,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!isAllFamilies(storesToFlush)) { perCfExtras = new StringBuilder(); for (HStore store: storesToFlush) { + MemStoreSize mss = store.getFlushableSize(); perCfExtras.append("; ").append(store.getColumnFamilyName()); - perCfExtras.append("=") - .append(StringUtils.byteDesc(store.getFlushableSize().getDataSize())); + perCfExtras.append("={dataSize=") + .append(StringUtils.byteDesc(mss.getDataSize())); + perCfExtras.append(", heapSize=") + .append(StringUtils.byteDesc(mss.getHeapSize())); + perCfExtras.append(", offHeapSize=") + .append(StringUtils.byteDesc(mss.getOffHeapSize())); + perCfExtras.append("}"); } } + MemStoreSize mss = this.memStoreSizing.getMemStoreSize(); LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," + - " memstore data size=" + StringUtils.byteDesc(this.memStoreSize.getDataSize()) + - " memstore heap size=" + StringUtils.byteDesc(this.memStoreSize.getHeapSize()) + + " memstore data size=" + StringUtils.byteDesc(mss.getDataSize()) + + " memstore heap size=" + StringUtils.byteDesc(mss.getHeapSize()) + ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId)); } @@ -2663,7 +2673,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. - this.decrMemStoreSize(prepareResult.totalFlushableSize); + MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize(); + this.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); // Increase the size of this Region for the purposes of quota. Noop if quotas are disabled. // During startup, quota manager may not be initialized yet. @@ -2740,12 +2751,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } long time = EnvironmentEdgeManager.currentTime() - startTime; - long flushableDataSize = prepareResult.totalFlushableSize.getDataSize(); - long flushableHeapSize = prepareResult.totalFlushableSize.getHeapSize(); - long memstoresize = this.memStoreSize.getDataSize(); + MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize(); + long memstoresize = this.memStoreSizing.getMemStoreSize().getDataSize(); String msg = "Finished memstore flush;" - + " data size ~" + StringUtils.byteDesc(flushableDataSize) + "/" + flushableDataSize - + ", heap size ~" + StringUtils.byteDesc(flushableHeapSize) + "/" + flushableHeapSize + + " data size ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize() + + ", heap size ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize() + ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + " for " + this.getRegionInfo().getEncodedName() + " in " + time + "ms, sequenceid=" + flushOpSeqId + ", compaction requested=" + compactionRequested @@ -2755,7 +2765,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (rsServices != null && rsServices.getMetrics() != null) { rsServices.getMetrics().updateFlush(time - startTime, - prepareResult.totalFlushableSize.getDataSize(), flushedOutputFileSize); + mss.getDataSize(), flushedOutputFileSize); } return new FlushResultImpl(compactionRequested ? @@ -3067,7 +3077,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected void writeMiniBatchOperationsToMemStore( final MiniBatchOperationInProgress miniBatchOp, final long writeNumber) throws IOException { - MemStoreSizing memStoreAccounting = new MemStoreSizing(); + MemStoreSizing memStoreAccounting = new NonThreadSafeMemStoreSizing(); visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> { // We need to update the sequence id for following reasons. // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. @@ -3080,7 +3090,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; }); // update memStore size - region.incMemStoreSize(memStoreAccounting); + region.incMemStoreSize(memStoreAccounting.getDataSize(), memStoreAccounting.getHeapSize(), + memStoreAccounting.getOffHeapSize()); } public boolean isDone() { @@ -4274,8 +4285,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If catalog region, do not impose resource constraints or block updates. if (this.getRegionInfo().isMetaRegion()) return; - if (this.memStoreSize.getHeapSize() - + this.memStoreSize.getOffHeapSize() > this.blockingMemStoreSize) { + MemStoreSize mss = this.memStoreSizing.getMemStoreSize(); + if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) { blockedRequestsCount.increment(); requestFlush(); // Don't print current limit because it will vary too much. The message is used as a key @@ -4645,7 +4656,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } boolean flush = false; - MemStoreSizing memstoreSize = new MemStoreSizing(); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); for (Cell cell: val.getCells()) { // Check this edit is for me. Also, guard against writing the special // METACOLUMN info such as HBASE::CACHEFLUSH entries @@ -4688,15 +4699,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } PrivateCellUtil.setSequenceId(cell, currentReplaySeqId); - restoreEdit(store, cell, memstoreSize); + restoreEdit(store, cell, memStoreSizing); editsCount++; } + MemStoreSize mss = memStoreSizing.getMemStoreSize(); if (this.rsAccounting != null) { - rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), - memstoreSize); + rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss); } - incMemStoreSize(memstoreSize); - flush = isFlushSize(this.memStoreSize); + incMemStoreSize(mss); + flush = isFlushSize(this.memStoreSizing.getMemStoreSize()); if (flush) { internalFlushcache(null, currentEditSeqId, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); @@ -5006,8 +5017,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi replayFlushInStores(flush, prepareFlushResult, true); // Set down the memstore size by amount of flush. - this.decrMemStoreSize(prepareFlushResult.totalFlushableSize); - + this.decrMemStoreSize(prepareFlushResult.totalFlushableSize.getMemStoreSize()); this.prepareFlushResult = null; writestate.flushing = false; } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) { @@ -5039,7 +5049,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi replayFlushInStores(flush, prepareFlushResult, true); // Set down the memstore size by amount of flush. - this.decrMemStoreSize(prepareFlushResult.totalFlushableSize); + this.decrMemStoreSize(prepareFlushResult.totalFlushableSize.getMemStoreSize()); // Inspect the memstore contents to see whether the memstore contains only edits // with seqId smaller than the flush seqId. If so, we can discard those edits. @@ -5143,7 +5153,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException */ private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException { - MemStoreSizing totalFreedSize = new MemStoreSizing(); + MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing(); this.updatesLock.writeLock().lock(); try { @@ -5170,7 +5180,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { this.updatesLock.writeLock().unlock(); } - return totalFreedSize; + return totalFreedSize.getMemStoreSize(); } private MemStoreSize doDropStoreMemStoreContentsForSeqId(HStore s, long currentSeqId) @@ -5293,9 +5303,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ? null : this.prepareFlushResult.storeFlushCtxs.get(family); if (ctx != null) { - MemStoreSize snapshotSize = store.getFlushableSize(); + MemStoreSize mss = store.getFlushableSize(); ctx.abort(); - this.decrMemStoreSize(snapshotSize); + this.decrMemStoreSize(mss); this.prepareFlushResult.storeFlushCtxs.remove(family); } } @@ -5487,12 +5497,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi null : this.prepareFlushResult.storeFlushCtxs.get( store.getColumnFamilyDescriptor().getName()); if (ctx != null) { - MemStoreSize snapshotSize = store.getFlushableSize(); + MemStoreSize mss = store.getFlushableSize(); ctx.abort(); - this.decrMemStoreSize(snapshotSize); - this.prepareFlushResult.storeFlushCtxs.remove( - store.getColumnFamilyDescriptor().getName()); - totalFreedDataSize += snapshotSize.getDataSize(); + this.decrMemStoreSize(mss); + this.prepareFlushResult.storeFlushCtxs. + remove(store.getColumnFamilyDescriptor().getName()); + totalFreedDataSize += mss.getDataSize(); } } } @@ -7374,8 +7384,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return null; } ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder(); - stats.setMemStoreLoad((int) (Math.min(100, (this.memStoreSize.getHeapSize() * 100) / this - .memstoreFlushSize))); + stats.setMemStoreLoad((int) (Math.min(100, + (this.memStoreSizing.getMemStoreSize().getHeapSize() * 100) / this.memstoreFlushSize))); if (rsServices.getHeapMemoryManager() != null) { // the HeapMemoryManager uses -0.0 to signal a problem asking the JVM, // so we could just do the calculation below and we'll get a 0. @@ -7436,7 +7446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // This is assigned by mvcc either explicity in the below or in the guts of the WAL append // when it assigns the edit a sequencedid (A.K.A the mvcc write number). WriteEntry writeEntry = null; - MemStoreSizing memstoreAccounting = new MemStoreSizing(); + MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); try { boolean success = false; try { @@ -7522,7 +7532,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { closeRegionOperation(); if (!mutations.isEmpty()) { - this.incMemStoreSize(memstoreAccounting); + this.incMemStoreSize(memstoreAccounting.getMemStoreSize()); requestFlushIfNeeded(); } } @@ -7626,7 +7636,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(op); List results = returnResults? new ArrayList<>(mutation.size()): null; RowLock rowLock = null; - MemStoreSizing memstoreAccounting = new MemStoreSizing(); + MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); try { rowLock = getRowLockInternal(mutation.getRow(), false, null); lock(this.updatesLock.readLock()); @@ -7676,7 +7686,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowLock.release(); } // Request a cache flush if over the limit. Do it outside update lock. - incMemStoreSize(memstoreAccounting); + incMemStoreSize(memstoreAccounting.getMemStoreSize()); requestFlushIfNeeded(); closeRegionOperation(op); if (this.metricsRegion != null) { @@ -8557,7 +8567,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } private void requestFlushIfNeeded() throws RegionTooBusyException { - if(isFlushSize(memStoreSize)) { + if(isFlushSize(this.memStoreSizing.getMemStoreSize())) { requestFlush(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 9e89e5e9cc9..9494e18dad9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2272,7 +2272,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat this.cacheFlushCount = snapshot.getCellsCount(); this.cacheFlushSize = snapshot.getDataSize(); committedFiles = new ArrayList<>(1); - return new MemStoreSize(snapshot.getMemStoreSize()); + return snapshot.getMemStoreSize(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index af7e5d570c3..910eaed48ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -50,12 +50,11 @@ public interface MemStore { void clearSnapshot(long id) throws UnexpectedStateException; /** - * On flush, how much memory we will clear. * Flush will first clear out the data in snapshot if any (It will take a second flush * invocation to clear the current Cell set). If snapshot is empty, current * Cell set will be flushed. * - * @return size of data that is going to be flushed + * @return On flush, how much memory we will clear. */ MemStoreSize getFlushableSize(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index ad8aa46d813..2a49797e52c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -555,8 +555,9 @@ class MemStoreFlusher implements FlushRequester { // If this is first time we've been put off, then emit a log message. if (fqe.getRequeueCount() <= 0) { // Note: We don't impose blockingStoreFiles constraint on meta regions - LOG.warn("Region " + region.getRegionInfo().getEncodedName() + " has too many " + - "store files; delaying flush up to " + this.blockingWaitTime + "ms"); + LOG.warn("{} has too many store files({}); delaying flush up to {} ms", + region.getRegionInfo().getEncodedName(), getStoreFileCount(region), + this.blockingWaitTime); if (!this.server.compactSplitThread.requestSplit(region)) { try { this.server.compactSplitThread.requestSystemCompaction(region, @@ -677,6 +678,14 @@ class MemStoreFlusher implements FlushRequester { return false; } + private int getStoreFileCount(Region region) { + int count = 0; + for (Store store : region.getStores()) { + count += store.getStorefilesCount(); + } + return count; + } + /** * Check if the regionserver's memstore memory usage is greater than the * limit. If so, flush regions with the biggest memstores until we're down @@ -760,10 +769,10 @@ class MemStoreFlusher implements FlushRequester { } } - private void logMsg(String string1, long val, long max) { - LOG.info("Blocking updates on " + server.toString() + ": " + string1 + " " - + TraditionalBinaryPrefix.long2String(val, "", 1) + " is >= than blocking " - + TraditionalBinaryPrefix.long2String(max, "", 1) + " size"); + private void logMsg(String type, long val, long max) { + LOG.info("Blocking updates: {} {} is >= blocking {}", type, + TraditionalBinaryPrefix.long2String(val, "", 1), + TraditionalBinaryPrefix.long2String(max, "", 1)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java index ec79e8d3d5e..97a416efe5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java @@ -19,64 +19,56 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; - /** - * Reports the data size part and total heap space occupied by the MemStore. - * Read-only. + * Data structure of three longs. + * Convenient package in which to carry current state of three counters. + *

Immutable!

* @see MemStoreSizing */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) public class MemStoreSize { - // MemStore size tracks 3 sizes: - // (1) data size: the aggregated size of all key-value not including meta data such as - // index, time range etc. - // (2) heap size: the aggregated size of all data that is allocated on-heap including all - // key-values that reside on-heap and the metadata that resides on-heap - // (3) off-heap size: the aggregated size of all data that is allocated off-heap including all - // key-values that reside off-heap and the metadata that resides off-heap - // - // 3 examples to illustrate their usage: - // Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated - // on-heap. The counters are <100MB, 120MB, 0>, respectively. - // Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata - // allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively. - // Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB - // are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated - // off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active). - // The counters are <100MB, 10MB, 110MB>, respectively. - /** *'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can - * be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap - * or off heap LABs + * be in on heap or off heap area depending on the MSLAB and its configuration to be using on + * heap or off heap LABs */ - protected volatile long dataSize; + private final long dataSize; - /** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead. + /**'getHeapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead. * When Cells in on heap area, this will include the cells data size as well. */ - protected volatile long heapSize; + private final long heapSize; /** off-heap size: the aggregated size of all data that is allocated off-heap including all * key-values that reside off-heap and the metadata that resides off-heap */ - protected volatile long offHeapSize; + private final long offHeapSize; - public MemStoreSize() { + /** + * Package private constructor. + */ + MemStoreSize() { this(0L, 0L, 0L); } - public MemStoreSize(long dataSize, long heapSize, long offHeapSize) { + /** + * Package private constructor. + */ + MemStoreSize(long dataSize, long heapSize, long offHeapSize) { this.dataSize = dataSize; this.heapSize = heapSize; this.offHeapSize = offHeapSize; } - protected MemStoreSize(MemStoreSize memStoreSize) { - this.dataSize = memStoreSize.dataSize; - this.heapSize = memStoreSize.heapSize; - this.offHeapSize = memStoreSize.offHeapSize; + /** + * Package private constructor. + */ + MemStoreSize(MemStoreSize memStoreSize) { + this.dataSize = memStoreSize.getDataSize(); + this.heapSize = memStoreSize.getHeapSize(); + this.offHeapSize = memStoreSize.getOffHeapSize(); } + public boolean isEmpty() { return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0; } @@ -101,24 +93,22 @@ public class MemStoreSize { if (!(obj instanceof MemStoreSize)) { return false; } - MemStoreSize other = (MemStoreSize) obj; - return this.dataSize == other.dataSize - && this.heapSize == other.heapSize - && this.offHeapSize == other.offHeapSize; + MemStoreSize other = (MemStoreSize)obj; + return this.dataSize == other.dataSize && this.heapSize == other.heapSize && + this.offHeapSize == other.offHeapSize; } @Override public int hashCode() { - long h = 13 * this.dataSize; - h = h + 14 * this.heapSize; - h = h + 15 * this.offHeapSize; + long h = 31 * this.dataSize; + h = h + 31 * this.heapSize; + h = h + 31 * this.offHeapSize; return (int) h; } @Override public String toString() { - return "dataSize=" + this.dataSize - + " , heapSize=" + this.heapSize - + " , offHeapSize=" + this.offHeapSize; + return "dataSize=" + this.dataSize + ", getHeapSize=" + this.heapSize + + ", getOffHeapSize=" + this.offHeapSize; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java index 0b3e9256682..8430ac6186c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java @@ -21,61 +21,96 @@ import org.apache.yetus.audience.InterfaceAudience; /** * Accounting of current heap and data sizes. - * Allows read/write on data/heap size as opposed to {@Link MemStoreSize} which is read-only. - * For internal use. - * @see MemStoreSize + * Tracks 3 sizes: + *
    + *
  1. data size: the aggregated size of all key-value not including meta data such as + * index, time range etc. + * + *
  2. heap size: the aggregated size of all data that is allocated on-heap including all + * key-values that reside on-heap and the metadata that resides on-heap + *
  3. + *
  4. off-heap size: the aggregated size of all data that is allocated off-heap including all + * key-values that reside off-heap and the metadata that resides off-heap + * + *
+ * + * 3 examples to illustrate their usage: + *

+ * Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated + * on-heap. The counters are <100MB, 120MB, 0>, respectively. + *

+ *

Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata + * allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively. + *

+ *

+ * Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB + * are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated + * off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active). + * The counters are <100MB, 10MB, 110MB>, respectively. + *

+ * + * Like {@link TimeRangeTracker}, it has thread-safe and non-thread-safe implementations. */ @InterfaceAudience.Private -public class MemStoreSizing extends MemStoreSize { - public static final MemStoreSizing DUD = new MemStoreSizing() { +public interface MemStoreSizing { + static final MemStoreSizing DUD = new MemStoreSizing() { + private final MemStoreSize mss = new MemStoreSize(); - @Override public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, - long offHeapSizeDelta) { - throw new RuntimeException("I'm a dud, you can't use me!"); + @Override + public MemStoreSize getMemStoreSize() { + return this.mss; } - @Override public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, + @Override + public long getDataSize() { + return this.mss.getDataSize(); + } + + @Override + public long getHeapSize() { + return this.mss.getHeapSize(); + } + + @Override + public long getOffHeapSize() { + return this.mss.getOffHeapSize(); + } + + @Override + public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { - throw new RuntimeException("I'm a dud, you can't use me!"); + throw new RuntimeException("I'm a DUD, you can't use me!"); } }; - public MemStoreSizing() { - super(); + /** + * @return The new dataSize ONLY as a convenience + */ + long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta); + + default long incMemStoreSize(MemStoreSize delta) { + return incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize()); } - public MemStoreSizing(long dataSize, long heapSize, long offHeapSize) { - super(dataSize, heapSize, offHeapSize); + /** + * @return The new dataSize ONLY as a convenience + */ + default long decMemStoreSize(long dataSizeDelta, long heapSizeDelta, + long offHeapSizeDelta) { + return incMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta); } - public MemStoreSizing(MemStoreSize memStoreSize) { - super(memStoreSize); + default long decMemStoreSize(MemStoreSize delta) { + return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), -delta.getOffHeapSize()); } - public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { - this.dataSize += dataSizeDelta; - this.heapSize += heapSizeDelta; - this.offHeapSize += offHeapSizeDelta; - } - - public void incMemStoreSize(MemStoreSize delta) { - incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize()); - } - - public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { - this.dataSize -= dataSizeDelta; - this.heapSize -= heapSizeDelta; - this.offHeapSize -= offHeapSizeDelta; - } - - public void decMemStoreSize(MemStoreSize delta) { - decMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize()); - } - - public void empty() { - this.dataSize = 0L; - this.heapSize = 0L; - this.offHeapSize = 0L; - } + long getDataSize(); + long getHeapSize(); + long getOffHeapSize(); + /** + * @return Use this datastructure to return all three settings, {@link #getDataSize()}, + * {@link #getHeapSize()}, and {@link #getOffHeapSize()}, in the one go. + */ + MemStoreSize getMemStoreSize(); } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index 1349921a8d2..c72d38558fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -44,7 +44,7 @@ public class MutableSegment extends Segment { protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC)); - incSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata + incMemStoreSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata } /** @@ -89,7 +89,7 @@ public class MutableSegment extends Segment { int cellLen = getCellLength(cur); long heapSize = heapSizeChange(cur, true); long offHeapSize = offHeapSizeChange(cur, true); - this.incSize(-cellLen, -heapSize, -offHeapSize); + incMemStoreSize(-cellLen, -heapSize, -offHeapSize); if (memStoreSizing != null) { memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java new file mode 100644 index 00000000000..601ff33d58c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Accounting of current heap and data sizes. + * NOT THREAD SAFE. + * Use in a 'local' context only where just a single-thread is updating. No concurrency! + * Used, for example, when summing all Cells in a single batch where result is then applied to the + * Store. + * @see ThreadSafeMemStoreSizing + */ +@InterfaceAudience.Private +class NonThreadSafeMemStoreSizing implements MemStoreSizing { + private long dataSize = 0; + private long heapSize = 0; + private long offHeapSize = 0; + + NonThreadSafeMemStoreSizing() { + this(0, 0, 0); + } + + NonThreadSafeMemStoreSizing(MemStoreSize mss) { + this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); + } + + NonThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) { + incMemStoreSize(dataSize, heapSize, offHeapSize); + } + + @Override + public MemStoreSize getMemStoreSize() { + return new MemStoreSize(this.dataSize, this.heapSize, this.offHeapSize); + } + + @Override + public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, + long offHeapSizeDelta) { + this.offHeapSize += offHeapSizeDelta; + this.heapSize += heapSizeDelta; + this.dataSize += dataSizeDelta; + return this.dataSize; + } + + @Override + public long getDataSize() { + return dataSize; + } + + @Override + public long getHeapSize() { + return heapSize; + } + + @Override + public long getOffHeapSize() { + return offHeapSize; + } + + @Override + public String toString() { + return getMemStoreSize().toString(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index 1c627f74b10..4e66fc7583a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -131,20 +131,20 @@ public class RegionServerAccounting { return this.globalMemStoreOffHeapSize.sum(); } - /** - * @param memStoreSize the Memstore size will be added to - * the global Memstore size - */ - public void incGlobalMemStoreSize(MemStoreSize memStoreSize) { - globalMemStoreDataSize.add(memStoreSize.getDataSize()); - globalMemStoreHeapSize.add(memStoreSize.getHeapSize()); - globalMemStoreOffHeapSize.add(memStoreSize.getOffHeapSize()); + void incGlobalMemStoreSize(MemStoreSize mss) { + incGlobalMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); } - public void decGlobalMemStoreSize(MemStoreSize memStoreSize) { - globalMemStoreDataSize.add(-memStoreSize.getDataSize()); - globalMemStoreHeapSize.add(-memStoreSize.getHeapSize()); - globalMemStoreOffHeapSize.add(-memStoreSize.getOffHeapSize()); + public void incGlobalMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { + globalMemStoreDataSize.add(dataSizeDelta); + globalMemStoreHeapSize.add(heapSizeDelta); + globalMemStoreOffHeapSize.add(offHeapSizeDelta); + } + + public void decGlobalMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { + globalMemStoreDataSize.add(-dataSizeDelta); + globalMemStoreHeapSize.add(-heapSizeDelta); + globalMemStoreOffHeapSize.add(-offHeapSizeDelta); } /** @@ -231,7 +231,7 @@ public class RegionServerAccounting { // the region open operation. No need to handle multi thread issues on one region's entry in // this Map. if (replayEdistsSize == null) { - replayEdistsSize = new MemStoreSizing(); + replayEdistsSize = new ThreadSafeMemStoreSizing(); replayEditsPerRegion.put(regionName, replayEdistsSize); } replayEdistsSize.incMemStoreSize(memStoreSize); @@ -244,10 +244,11 @@ public class RegionServerAccounting { * @param regionName the region which could not open. */ public void rollbackRegionReplayEditsSize(byte[] regionName) { - MemStoreSize replayEditsSize = replayEditsPerRegion.get(regionName); - if (replayEditsSize != null) { + MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName); + if (replayEditsSizing != null) { clearRegionReplayEditsSize(regionName); - decGlobalMemStoreSize(replayEditsSize); + decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(), + replayEditsSizing.getOffHeapSize()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java index 5b98a273d12..b088856818d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java @@ -65,8 +65,8 @@ public class RegionServicesForStores { region.unblockUpdates(); } - public void addMemStoreSize(MemStoreSize size) { - region.incMemStoreSize(size); + public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { + region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta); } public RegionInfo getRegionInfo() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 8aafa42acff..10f9b248ceb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -553,9 +553,6 @@ public class ScannerContext { /** * Set all fields together. - * @param batch - * @param sizeScope - * @param dataSize */ void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize, LimitScope timeScope, long time) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 517f537d126..7069bf831e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -45,7 +45,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti * segments from active set to snapshot set in the default implementation. */ @InterfaceAudience.Private -public abstract class Segment { +public abstract class Segment implements MemStoreSizing { public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, memStoreSizing, @@ -59,9 +59,9 @@ public abstract class Segment { private final CellComparator comparator; protected long minSequenceId; private MemStoreLAB memStoreLAB; - // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not + // Sum of sizes of all Cells added to this Segment. Cell's HeapSize is considered. This is not // including the heap overhead of this class. - protected final MemStoreSizing segmentSize; + protected final MemStoreSizing memStoreSizing; protected final TimeRangeTracker timeRangeTracker; protected volatile boolean tagsPresent; @@ -69,7 +69,9 @@ public abstract class Segment { // and there is no need in true Segments state protected Segment(CellComparator comparator, TimeRangeTracker trt) { this.comparator = comparator; - this.segmentSize = new MemStoreSizing(); + // Do we need to be thread safe always? What if ImmutableSegment? + // DITTO for the TimeRangeTracker below. + this.memStoreSizing = new ThreadSafeMemStoreSizing(); this.timeRangeTracker = trt; } @@ -85,7 +87,9 @@ public abstract class Segment { OffHeapSize += memStoreSize.getOffHeapSize(); } this.comparator = comparator; - this.segmentSize = new MemStoreSizing(dataSize, heapSize, OffHeapSize); + // Do we need to be thread safe always? What if ImmutableSegment? + // DITTO for the TimeRangeTracker below. + this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize); this.timeRangeTracker = trt; } @@ -95,7 +99,9 @@ public abstract class Segment { this.comparator = comparator; this.minSequenceId = Long.MAX_VALUE; this.memStoreLAB = memStoreLAB; - this.segmentSize = new MemStoreSizing(); + // Do we need to be thread safe always? What if ImmutableSegment? + // DITTO for the TimeRangeTracker below. + this.memStoreSizing = new ThreadSafeMemStoreSizing(); this.tagsPresent = false; this.timeRangeTracker = trt; } @@ -105,7 +111,7 @@ public abstract class Segment { this.comparator = segment.getComparator(); this.minSequenceId = segment.getMinSequenceId(); this.memStoreLAB = segment.getMemStoreLAB(); - this.segmentSize = new MemStoreSizing(segment.getMemStoreSize()); + this.memStoreSizing = new ThreadSafeMemStoreSizing(segment.memStoreSizing.getMemStoreSize()); this.tagsPresent = segment.isTagsPresent(); this.timeRangeTracker = segment.getTimeRangeTracker(); } @@ -213,39 +219,29 @@ public abstract class Segment { return this; } + @Override public MemStoreSize getMemStoreSize() { - return this.segmentSize; + return this.memStoreSizing.getMemStoreSize(); } - /** - * @return Sum of all cell's size. - */ - public long keySize() { - return this.segmentSize.getDataSize(); + @Override + public long getDataSize() { + return this.memStoreSizing.getDataSize(); } - /** - * @return The heap size of this segment. - */ - public long heapSize() { - return this.segmentSize.getHeapSize(); + @Override + public long getHeapSize() { + return this.memStoreSizing.getHeapSize(); } - /** - * @return The off-heap size of this segment. - */ - public long offHeapSize() { - return this.segmentSize.getOffHeapSize(); + @Override + public long getOffHeapSize() { + return this.memStoreSizing.getOffHeapSize(); } - /** - * Updates the size counters of the segment by the given delta - */ - //TODO - protected void incSize(long delta, long heapOverhead, long offHeapOverhead) { - synchronized (this) { - this.segmentSize.incMemStoreSize(delta, heapOverhead, offHeapOverhead); - } + @Override + public long incMemStoreSize(long delta, long heapOverhead, long offHeapOverhead) { + return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead); } public long getMinSequenceId() { @@ -308,7 +304,7 @@ public abstract class Segment { } long heapSize = heapSizeChange(cellToAdd, succ); long offHeapSize = offHeapSizeChange(cellToAdd, succ); - incSize(cellSize, heapSize, offHeapSize); + incMemStoreSize(cellSize, heapSize, offHeapSize); if (memstoreSizing != null) { memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize); } @@ -408,8 +404,8 @@ public abstract class Segment { String res = "type=" + this.getClass().getSimpleName() + ", "; res += "empty=" + (isEmpty()? "yes": "no") + ", "; res += "cellCount=" + getCellsCount() + ", "; - res += "cellSize=" + keySize() + ", "; - res += "totalHeapSize=" + heapSize() + ", "; + res += "cellSize=" + getDataSize() + ", "; + res += "totalHeapSize=" + getHeapSize() + ", "; res += "min timestamp=" + timeRangeTracker.getMin() + ", "; res += "max timestamp=" + timeRangeTracker.getMax(); return res; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java new file mode 100644 index 00000000000..de0549386e8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Accounting of current heap and data sizes. + * Thread-safe. Many threads can do updates against this single instance. + * @see NonThreadSafeMemStoreSizing + * @see MemStoreSize + */ +@InterfaceAudience.Private +class ThreadSafeMemStoreSizing implements MemStoreSizing { + // We used to tie the update of these thread counters so + // they all changed together under one lock. This was + // undone. Doesn't seem necessary. + private final AtomicLong dataSize = new AtomicLong(); + private final AtomicLong heapSize = new AtomicLong(); + private final AtomicLong offHeapSize = new AtomicLong(); + + ThreadSafeMemStoreSizing() { + this(0, 0, 0); + } + + ThreadSafeMemStoreSizing(MemStoreSize mss) { + this(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); + } + + ThreadSafeMemStoreSizing(long dataSize, long heapSize, long offHeapSize) { + incMemStoreSize(dataSize, heapSize, offHeapSize); + } + + public MemStoreSize getMemStoreSize() { + return new MemStoreSize(getDataSize(), getHeapSize(), getOffHeapSize()); + } + + public long incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { + this.offHeapSize.addAndGet(offHeapSizeDelta); + this.heapSize.addAndGet(heapSizeDelta); + return this.dataSize.addAndGet(dataSizeDelta); + } + + @Override + public long getDataSize() { + return dataSize.get(); + } + + @Override + public long getHeapSize() { + return heapSize.get(); + } + + @Override + public long getOffHeapSize() { + return offHeapSize.get(); + } + + @Override + public String toString() { + return getMemStoreSize().toString(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 5cbfff96443..8dbddb9e44c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -355,7 +355,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException { // Save off old state. - long oldHistorySize = hmc.getSnapshot().keySize(); + long oldHistorySize = hmc.getSnapshot().getDataSize(); long prevTimeStamp = hmc.timeOfOldestEdit(); hmc.snapshot(); @@ -616,9 +616,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + // simulate flusher + region.decrMemStoreSize(mss); ImmutableSegment s = memstore.getSnapshot(); assertEquals(4, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemStoreSize()); @@ -667,7 +668,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); @@ -675,9 +676,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { + 7 * oneCellOnCAHeapSize; assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - size = memstore.getFlushableSize(); + mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + // simulate flusher + region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); ImmutableSegment s = memstore.getSnapshot(); assertEquals(7, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemStoreSize()); @@ -722,7 +724,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); ((MyCompactingMemStore) memstore).disableCompaction(); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction assertEquals(0, memstore.getSnapshot().getCellsCount()); // No change in the cells data size. ie. memstore size. as there is no compaction. @@ -738,7 +740,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); ((MyCompactingMemStore)memstore).enableCompaction(); - size = memstore.getFlushableSize(); + mss = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact assertEquals(0, memstore.getSnapshot().getCellsCount()); // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. @@ -751,9 +753,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); - size = memstore.getFlushableSize(); + mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + // simulate flusher + region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); ImmutableSegment s = memstore.getSnapshot(); assertEquals(4, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemStoreSize()); @@ -811,9 +814,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertTrue(4 == numCells || 11 == numCells); assertEquals(0, memstore.getSnapshot().getCellsCount()); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + // simulate flusher + region.decrMemStoreSize(mss); ImmutableSegment s = memstore.getSnapshot(); numCells = s.getCellsCount(); assertTrue(4 == numCells || 11 == numCells); @@ -825,8 +829,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { byte[] fam = Bytes.toBytes("testfamily"); byte[] qf = Bytes.toBytes("testqualifier"); - long size = hmc.getActive().keySize(); - long heapOverhead = hmc.getActive().heapSize(); + long size = hmc.getActive().getDataSize(); + long heapOverhead = hmc.getActive().getHeapSize(); int totalLen = 0; for (int i = 0; i < keys.length; i++) { long timestamp = System.currentTimeMillis(); @@ -838,8 +842,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { hmc.add(kv, null); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); } - regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size, - hmc.getActive().heapSize() - heapOverhead, 0)); + regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, + hmc.getActive().getHeapSize() - heapOverhead, 0); return totalLen; } @@ -847,8 +851,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { byte[] fam = Bytes.toBytes("testfamily"); byte[] qf = Bytes.toBytes("testqualifier"); - long size = hmc.getActive().keySize(); - long heapOverhead = hmc.getActive().heapSize(); + long size = hmc.getActive().getDataSize(); + long heapOverhead = hmc.getActive().getHeapSize(); int totalLen = 0; for (int i = 0; i < keys.length; i++) { long timestamp = System.currentTimeMillis(); @@ -859,8 +863,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { hmc.add(kv, null); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); } - regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size, - hmc.getActive().heapSize() - heapOverhead, 0)); + regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, + hmc.getActive().getHeapSize() - heapOverhead, 0); return totalLen; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java index 12b078aae9f..e8241202b0b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java @@ -132,9 +132,9 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore counter += s.getCellsCount(); } assertEquals(3, counter); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + region.decrMemStoreSize(mss); // simulate flusher ImmutableSegment s = memstore.getSnapshot(); assertEquals(3, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemStoreSize()); @@ -194,9 +194,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore totalHeapSize2 = 1 * cellAfterFlushSize; assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + // simulate flusher + region.decrMemStoreSize(mss); ImmutableSegment s = memstore.getSnapshot(); assertEquals(4, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemStoreSize()); @@ -224,7 +225,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore assertEquals(totalCellsLen1, region.getMemStoreDataSize()); assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact assertEquals(0, memstore.getSnapshot().getCellsCount()); @@ -245,7 +246,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); ((MyCompactingMemStore) memstore).disableCompaction(); - size = memstore.getFlushableSize(); + mss = memstore.getFlushableSize(); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; assertEquals(0, memstore.getSnapshot().getCellsCount()); @@ -260,7 +261,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore ((CompactingMemStore) memstore).heapSize()); ((MyCompactingMemStore) memstore).enableCompaction(); - size = memstore.getFlushableSize(); + mss = memstore.getFlushableSize(); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { Threads.sleep(10); @@ -279,9 +280,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize()); - size = memstore.getFlushableSize(); + mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + // simulate flusher + region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); ImmutableSegment s = memstore.getSnapshot(); assertEquals(4, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemStoreSize()); @@ -653,9 +655,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + // simulate flusher + region.decrMemStoreSize(mss); ImmutableSegment s = memstore.getSnapshot(); assertEquals(numOfCells, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemStoreSize()); @@ -725,9 +728,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + // simulate flusher + region.decrMemStoreSize(mss); ImmutableSegment s = memstore.getSnapshot(); assertEquals(numOfCells, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemStoreSize()); @@ -799,9 +803,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); - MemStoreSize size = memstore.getFlushableSize(); + MemStoreSize mss = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher + // simulate flusher + region.decrMemStoreSize(mss); ImmutableSegment s = memstore.getSnapshot(); assertEquals(numOfCells, s.getCellsCount()); assertEquals(0, regionServicesForStores.getMemStoreSize()); @@ -893,7 +898,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) { byte[] fam = Bytes.toBytes("testfamily"); byte[] qf = Bytes.toBytes("testqualifier"); - MemStoreSizing memstoreSizing = new MemStoreSizing(); + MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); for (int i = 0; i < keys.length; i++) { long timestamp = System.currentTimeMillis(); Threads.sleep(1); // to make sure each kv gets a different ts @@ -903,8 +908,10 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore hmc.add(kv, memstoreSizing); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); } - regionServicesForStores.addMemStoreSize(memstoreSizing); - return memstoreSizing.getDataSize(); + MemStoreSize mss = memstoreSizing.getMemStoreSize(); + regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), + mss.getOffHeapSize()); + return mss.getDataSize(); } private long cellBeforeFlushSize() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 6da5ec0a74f..77f796f8c3b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -129,17 +129,18 @@ public class TestDefaultMemStore { public void testPutSameCell() { byte[] bytes = Bytes.toBytes(getName()); KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes); - MemStoreSizing sizeChangeForFirstCell = new MemStoreSizing(); + MemStoreSizing sizeChangeForFirstCell = new NonThreadSafeMemStoreSizing(); this.memstore.add(kv, sizeChangeForFirstCell); - MemStoreSizing sizeChangeForSecondCell = new MemStoreSizing(); + MemStoreSizing sizeChangeForSecondCell = new NonThreadSafeMemStoreSizing(); this.memstore.add(kv, sizeChangeForSecondCell); // make sure memstore size increase won't double-count MSLAB chunk size - assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getDataSize()); + assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getMemStoreSize().getDataSize()); Segment segment = this.memstore.getActive(); MemStoreLAB msLab = segment.getMemStoreLAB(); if (msLab != null) { // make sure memstore size increased even when writing the same cell, if using MSLAB - assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize()); + assertEquals(Segment.getCellLength(kv), + sizeChangeForSecondCell.getMemStoreSize().getDataSize()); // make sure chunk size increased even when writing the same cell, if using MSLAB if (msLab instanceof MemStoreLABImpl) { // since we add the chunkID at the 0th offset of the chunk and the @@ -149,8 +150,8 @@ public class TestDefaultMemStore { } } else { // make sure no memstore size change w/o MSLAB - assertEquals(0, sizeChangeForSecondCell.getDataSize()); - assertEquals(0, sizeChangeForSecondCell.getHeapSize()); + assertEquals(0, sizeChangeForSecondCell.getMemStoreSize().getDataSize()); + assertEquals(0, sizeChangeForSecondCell.getMemStoreSize().getHeapSize()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 35266896df4..73c88d2b2f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -361,8 +361,9 @@ public class TestHRegion { } finally { assertTrue("The regionserver should have thrown an exception", threwIOE); } - long sz = store.getFlushableSize().getDataSize(); - assertTrue("flushable size should be zero, but it is " + sz, sz == 0); + MemStoreSize mss = store.getFlushableSize(); + assertTrue("flushable size should be zero, but it is " + mss, + mss.getDataSize() == 0); HBaseTestingUtility.closeRegionAndWAL(region); } @@ -414,9 +415,10 @@ public class TestHRegion { } catch (IOException expected) { } long expectedSize = onePutSize * 2; - assertEquals("memstoreSize should be incremented", expectedSize, region.getMemStoreDataSize()); - assertEquals("flushable size should be incremented", expectedSize, - store.getFlushableSize().getDataSize()); + assertEquals("memstoreSize should be incremented", + expectedSize, region.getMemStoreDataSize()); + assertEquals("flushable size should be incremented", + expectedSize, store.getFlushableSize().getDataSize()); region.setCoprocessorHost(null); HBaseTestingUtility.closeRegionAndWAL(region); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index d06c1af89fe..3b4ce500390 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -367,7 +367,7 @@ public class TestHRegionReplayEvents { HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); - long storeFlushableSize = store.getFlushableSize().getHeapSize(); + MemStoreSize mss = store.getFlushableSize(); long storeSize = store.getSize(); long storeSizeUncompressed = store.getStoreSizeUncompressed(); if (flushDesc.getAction() == FlushAction.START_FLUSH) { @@ -391,8 +391,8 @@ public class TestHRegionReplayEvents { for (HStore s : secondaryRegion.getStores()) { assertEquals(expectedStoreFileCount, s.getStorefilesCount()); } - long newFlushableSize = store.getFlushableSize().getHeapSize(); - assertTrue(storeFlushableSize > newFlushableSize); + MemStoreSize newMss = store.getFlushableSize(); + assertTrue(mss.getHeapSize() > newMss.getHeapSize()); // assert that the region memstore is smaller now long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); @@ -466,7 +466,7 @@ public class TestHRegionReplayEvents { HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); - long storeFlushableSize = store.getFlushableSize().getHeapSize(); + MemStoreSize mss = store.getFlushableSize(); if (flushDesc.getAction() == FlushAction.START_FLUSH) { startFlushDesc = flushDesc; @@ -475,7 +475,7 @@ public class TestHRegionReplayEvents { assertNull(result.result); assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); assertTrue(regionMemstoreSize > 0); - assertTrue(storeFlushableSize > 0); + assertTrue(mss.getHeapSize() > 0); // assert that the store memstore is smaller now long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); @@ -616,8 +616,8 @@ public class TestHRegionReplayEvents { assertEquals(expectedStoreFileCount, s.getStorefilesCount()); } HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); - long newFlushableSize = store.getFlushableSize().getHeapSize(); - assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped + MemStoreSize mss = store.getFlushableSize(); + assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped // assert that the region memstore is same as before long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); @@ -706,8 +706,8 @@ public class TestHRegionReplayEvents { assertEquals(expectedStoreFileCount, s.getStorefilesCount()); } HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); - long newFlushableSize = store.getFlushableSize().getHeapSize(); - assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped + MemStoreSize mss = store.getFlushableSize(); + assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped // assert that the region memstore is smaller than before, but not empty long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); @@ -811,12 +811,12 @@ public class TestHRegionReplayEvents { assertEquals(expectedStoreFileCount, s.getStorefilesCount()); } HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); - long newFlushableSize = store.getFlushableSize().getHeapSize(); + MemStoreSize mss = store.getFlushableSize(); if (droppableMemstore) { // assert that the memstore is dropped - assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD); + assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); } else { - assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped + assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped } // assert that the region memstore is same as before (we could not drop) @@ -903,8 +903,8 @@ public class TestHRegionReplayEvents { assertEquals(expectedStoreFileCount, s.getStorefilesCount()); } HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); - long newFlushableSize = store.getFlushableSize().getHeapSize(); - assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD); + MemStoreSize mss = store.getFlushableSize(); + assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); // assert that the region memstore is empty long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java index dfe52d06001..ce833269cfd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java @@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory; */ @Category({VerySlowRegionServerTests.class, LargeTests.class}) @SuppressWarnings("deprecation") -public class TestHRegionWithInMemoryFlush extends TestHRegion{ +public class TestHRegionWithInMemoryFlush extends TestHRegion { @ClassRule public static final HBaseClassTestRule CLASS_RULE = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 8dadd9ba021..6803003672a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -240,8 +240,7 @@ public class TestHStore { */ @Test public void testFlushSizeSizing() throws Exception { - LOG.info("Setting up a faulty file system that cannot write in " + - this.name.getMethodName()); + LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); // Only retry once. conf.setInt("hbase.hstore.flush.retries.number", 1); @@ -260,15 +259,15 @@ public class TestHStore { // Initialize region init(name.getMethodName(), conf); - MemStoreSize size = store.memstore.getFlushableSize(); - assertEquals(0, size.getDataSize()); + MemStoreSize mss = store.memstore.getFlushableSize(); + assertEquals(0, mss.getDataSize()); LOG.info("Adding some data"); - MemStoreSizing kvSize = new MemStoreSizing(); + MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); // add the heap size of active (mutable) segment kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0); - size = store.memstore.getFlushableSize(); - assertEquals(kvSize, size); + mss = store.memstore.getFlushableSize(); + assertEquals(kvSize.getMemStoreSize(), mss); // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. try { LOG.info("Flushing"); @@ -280,23 +279,23 @@ public class TestHStore { // due to snapshot, change mutable to immutable segment kvSize.incMemStoreSize(0, CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD, 0); - size = store.memstore.getFlushableSize(); - assertEquals(kvSize, size); - MemStoreSizing kvSize2 = new MemStoreSizing(); + mss = store.memstore.getFlushableSize(); + assertEquals(kvSize.getMemStoreSize(), mss); + MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2); kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0); // Even though we add a new kv, we expect the flushable size to be 'same' since we have // not yet cleared the snapshot -- the above flush failed. - assertEquals(kvSize, size); + assertEquals(kvSize.getMemStoreSize(), mss); ffs.fault.set(false); flushStore(store, id++); - size = store.memstore.getFlushableSize(); + mss = store.memstore.getFlushableSize(); // Size should be the foreground kv size. - assertEquals(kvSize2, size); + assertEquals(kvSize2.getMemStoreSize(), mss); flushStore(store, id++); - size = store.memstore.getFlushableSize(); - assertEquals(0, size.getDataSize()); - assertEquals(MutableSegment.DEEP_OVERHEAD, size.getHeapSize()); + mss = store.memstore.getFlushableSize(); + assertEquals(0, mss.getDataSize()); + assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); return null; } }); @@ -1226,7 +1225,7 @@ public class TestHStore { byte[] value0 = Bytes.toBytes("value0"); byte[] value1 = Bytes.toBytes("value1"); byte[] value2 = Bytes.toBytes("value2"); - MemStoreSizing memStoreSizing = new MemStoreSizing(); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); long ts = EnvironmentEdgeManager.currentTime(); long seqId = 100; init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), @@ -1285,7 +1284,7 @@ public class TestHStore { init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); byte[] value = Bytes.toBytes("value"); - MemStoreSizing memStoreSizing = new MemStoreSizing(); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); long ts = EnvironmentEdgeManager.currentTime(); long seqId = 100; // older data whihc shouldn't be "seen" by client @@ -1363,7 +1362,7 @@ public class TestHStore { }); byte[] oldValue = Bytes.toBytes("oldValue"); byte[] currentValue = Bytes.toBytes("currentValue"); - MemStoreSizing memStoreSizing = new MemStoreSizing(); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); long ts = EnvironmentEdgeManager.currentTime(); long seqId = 100; // older data whihc shouldn't be "seen" by client @@ -1479,7 +1478,7 @@ public class TestHStore { init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); byte[] value = Bytes.toBytes("thisisavarylargevalue"); - MemStoreSizing memStoreSizing = new MemStoreSizing(); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); long ts = EnvironmentEdgeManager.currentTime(); long seqId = 100; // older data whihc shouldn't be "seen" by client @@ -1601,7 +1600,7 @@ public class TestHStore { conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); // Set the lower threshold to invoke the "MERGE" policy MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); - MemStoreSizing memStoreSizing = new MemStoreSizing(); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); long ts = System.currentTimeMillis(); long seqID = 1L; // Add some data to the region and do some flushes