From 60367b2a27840a14b468325edf6ac62d0962770f Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Mon, 23 Oct 2017 20:57:46 -0700 Subject: [PATCH] HBASE-19074 Miscellaneous Observer cleanups Breaks MemStoreSize into MemStoreSize (read-only) and MemStoreSizing (read/write). MemStoreSize we allow to Coprocesors. MemStoreSizing we use internally doing MemStore accounting. --- .../hbase/coprocessor/RegionObserver.java | 6 + .../hadoop/hbase/coprocessor/WALObserver.java | 6 + .../hbase/regionserver/AbstractMemStore.java | 28 +++-- .../CellArrayImmutableSegment.java | 5 +- .../CellChunkImmutableSegment.java | 6 +- .../regionserver/CompactingMemStore.java | 20 ++-- .../regionserver/CompactionPipeline.java | 22 ++-- .../CompositeImmutableSegment.java | 4 +- .../hbase/regionserver/DefaultMemStore.java | 16 +-- .../hadoop/hbase/regionserver/HRegion.java | 61 +++++------ .../hadoop/hbase/regionserver/HStore.java | 18 +-- .../hadoop/hbase/regionserver/MemStore.java | 12 +- .../hbase/regionserver/MemStoreSize.java | 66 ++++------- .../hbase/regionserver/MemStoreSizing.java | 103 ++++++++++++++++++ .../MiniBatchOperationInProgress.java | 3 +- .../hbase/regionserver/MutableSegment.java | 16 ++- .../regionserver/RegionServerAccounting.java | 9 +- .../hadoop/hbase/regionserver/Segment.java | 16 +-- .../hbase/regionserver/SegmentFactory.java | 7 +- .../TestCompactingToCellFlatMapMemStore.java | 8 +- .../regionserver/TestDefaultMemStore.java | 4 +- .../hadoop/hbase/regionserver/TestHStore.java | 86 +++++++-------- .../wal/AbstractTestWALReplay.java | 8 +- 23 files changed, 308 insertions(+), 222 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 815daf184ac..5c89149fa42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -880,7 +880,10 @@ public interface RegionObserver { * Called before a {@link WALEdit} * replayed for this region. * @param ctx the environment provided by the region server + * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced + * with something that doesn't expose IntefaceAudience.Private classes. */ + @Deprecated default void preWALRestore(ObserverContext ctx, RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} @@ -888,7 +891,10 @@ public interface RegionObserver { * Called after a {@link WALEdit} * replayed for this region. * @param ctx the environment provided by the region server + * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced + * with something that doesn't expose IntefaceAudience.Private classes. */ + @Deprecated default void postWALRestore(ObserverContext ctx, RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java index 2190abf6067..ec8518b039f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java @@ -72,8 +72,11 @@ public interface WALObserver { * is writen to WAL. * * @return true if default behavior should be bypassed, false otherwise + * @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose + * InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0. */ // TODO: return value is not used + @Deprecated default boolean preWALWrite(ObserverContext ctx, RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { return false; @@ -82,7 +85,10 @@ public interface WALObserver { /** * Called after a {@link WALEdit} * is writen to WAL. + * @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose + * InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0. */ + @Deprecated default void postWALWrite(ObserverContext ctx, RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 4cb9ed1f4be..ee480be1e2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -96,14 +96,14 @@ public abstract class AbstractMemStore implements MemStore { public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); @Override - public void add(Iterable cells, MemStoreSize memstoreSize) { + public void add(Iterable cells, MemStoreSizing memstoreSizing) { for (Cell cell : cells) { - add(cell, memstoreSize); + add(cell, memstoreSizing); } } @Override - public void add(Cell cell, MemStoreSize memstoreSize) { + public void add(Cell cell, MemStoreSizing memstoreSizing) { Cell toAdd = maybeCloneWithAllocator(cell); boolean mslabUsed = (toAdd != cell); // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By @@ -118,7 +118,7 @@ public abstract class AbstractMemStore implements MemStore { if (!mslabUsed) { toAdd = deepCopyIfNeeded(toAdd); } - internalAdd(toAdd, mslabUsed, memstoreSize); + internalAdd(toAdd, mslabUsed, memstoreSizing); } private static Cell deepCopyIfNeeded(Cell cell) { @@ -129,9 +129,9 @@ public abstract class AbstractMemStore implements MemStore { } @Override - public void upsert(Iterable cells, long readpoint, MemStoreSize memstoreSize) { + public void upsert(Iterable cells, long readpoint, MemStoreSizing memstoreSizing) { for (Cell cell : cells) { - upsert(cell, readpoint, memstoreSize); + upsert(cell, readpoint, memstoreSizing); } } @@ -167,7 +167,11 @@ public abstract class AbstractMemStore implements MemStore { @Override public MemStoreSize getSnapshotSize() { - return new MemStoreSize(this.snapshot.keySize(), this.snapshot.heapSize()); + return getSnapshotSizing(); + } + + MemStoreSizing getSnapshotSizing() { + return new MemStoreSizing(this.snapshot.keySize(), this.snapshot.heapSize()); } @Override @@ -210,7 +214,7 @@ public abstract class AbstractMemStore implements MemStore { * @param readpoint readpoint below which we can safely remove duplicate KVs * @param memstoreSize */ - private void upsert(Cell cell, long readpoint, MemStoreSize memstoreSize) { + private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) { // Add the Cell to the MemStore // Use the internalAdd method here since we (a) already have a lock // and (b) cannot safely use the MSLAB here without potentially @@ -221,7 +225,7 @@ public abstract class AbstractMemStore implements MemStore { // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and // prevent it from getting GCed. cell = deepCopyIfNeeded(cell); - this.active.upsert(cell, readpoint, memstoreSize); + this.active.upsert(cell, readpoint, memstoreSizing); setOldestEditTimeToNow(); checkActiveSize(); } @@ -277,8 +281,8 @@ public abstract class AbstractMemStore implements MemStore { * @param mslabUsed whether using MSLAB * @param memstoreSize */ - private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSize memstoreSize) { - active.add(toAdd, mslabUsed, memstoreSize); + private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) { + active.add(toAdd, mslabUsed, memstoreSizing); setOldestEditTimeToNow(); checkActiveSize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java index ea49fcaf55c..8cd5f2a420f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.util.ClassSize; @@ -55,7 +54,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment { * of CSLMImmutableSegment * The given iterator returns the Cells that "survived" the compaction. */ - protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSize memstoreSize) { + protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing) { super(segment); // initiailize the upper class incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM); int numOfCells = segment.getCellsCount(); @@ -65,7 +64,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment { // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); incSize(0, newSegmentSizeDelta); - memstoreSize.incMemStoreSize(0, newSegmentSizeDelta); + memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java index b680b61f283..4ef0657ca5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.ByteBufferKeyValue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.yetus.audience.InterfaceAudience; @@ -61,7 +60,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment { * of CSLMImmutableSegment * The given iterator returns the Cells that "survived" the compaction. */ - protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemStoreSize memstoreSize) { + protected CellChunkImmutableSegment(CSLMImmutableSegment segment, + MemStoreSizing memstoreSizing) { super(segment); // initiailize the upper class incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); int numOfCells = segment.getCellsCount(); @@ -73,7 +73,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); incSize(0, newSegmentSizeDelta); - memstoreSize.incMemStoreSize(0, newSegmentSizeDelta); + memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 01138df92ad..e6f9451d860 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -146,12 +146,12 @@ public class CompactingMemStore extends AbstractMemStore { */ @Override public MemStoreSize size() { - MemStoreSize memstoreSize = new MemStoreSize(); - memstoreSize.incMemStoreSize(this.active.keySize(), this.active.heapSize()); + MemStoreSizing memstoreSizing = new MemStoreSizing(); + memstoreSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize()); for (Segment item : pipeline.getSegments()) { - memstoreSize.incMemStoreSize(item.keySize(), item.heapSize()); + memstoreSizing.incMemStoreSize(item.keySize(), item.heapSize()); } - return memstoreSize; + return memstoreSizing; } /** @@ -215,17 +215,17 @@ public class CompactingMemStore extends AbstractMemStore { */ @Override public MemStoreSize getFlushableSize() { - MemStoreSize snapshotSize = getSnapshotSize(); - if (snapshotSize.getDataSize() == 0) { + MemStoreSizing snapshotSizing = getSnapshotSizing(); + if (snapshotSizing.getDataSize() == 0) { // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed if (compositeSnapshot) { - snapshotSize = pipeline.getPipelineSize(); - snapshotSize.incMemStoreSize(this.active.keySize(), this.active.heapSize()); + snapshotSizing = pipeline.getPipelineSizing(); + snapshotSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize()); } else { - snapshotSize = pipeline.getTailSize(); + snapshotSizing = pipeline.getTailSizing(); } } - return snapshotSize.getDataSize() > 0 ? snapshotSize + return snapshotSizing.getDataSize() > 0 ? snapshotSizing : new MemStoreSize(this.active.keySize(), this.active.heapSize()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 944a7feffb8..75f9914ecb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -149,7 +149,7 @@ public class CompactionPipeline { long newHeapSize = 0; if(segment != null) newHeapSize = segment.heapSize(); long heapSizeDelta = suffixHeapSize - newHeapSize; - region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta)); + region.addMemStoreSize(new MemStoreSizing(-dataSizeDelta, -heapSizeDelta)); if (LOG.isDebugEnabled()) { LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: " + newDataSize + ". Suffix heap size: " + suffixHeapSize @@ -199,14 +199,14 @@ public class CompactionPipeline { int i = 0; for (ImmutableSegment s : pipeline) { if ( s.canBeFlattened() ) { - MemStoreSize newMemstoreSize = new MemStoreSize(); // the size to be updated + MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( - (CSLMImmutableSegment)s,idxType,newMemstoreSize); + (CSLMImmutableSegment)s,idxType,newMemstoreAccounting); replaceAtIndex(i,newS); if(region != null) { // update the global memstore size counter // upon flattening there is no change in the data size - region.addMemStoreSize(new MemStoreSize(0, newMemstoreSize.getHeapSize())); + region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize())); } LOG.debug("Compaction pipeline segment " + s + " was flattened"); return true; @@ -241,22 +241,22 @@ public class CompactionPipeline { return minSequenceId; } - public MemStoreSize getTailSize() { + public MemStoreSizing getTailSizing() { LinkedList localCopy = readOnlyCopy; - if (localCopy.isEmpty()) return new MemStoreSize(true); - return new MemStoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize()); + if (localCopy.isEmpty()) return new MemStoreSizing(); + return new MemStoreSizing(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize()); } - public MemStoreSize getPipelineSize() { + public MemStoreSizing getPipelineSizing() { long keySize = 0; long heapSize = 0; LinkedList localCopy = readOnlyCopy; - if (localCopy.isEmpty()) return new MemStoreSize(true); + if (localCopy.isEmpty()) return new MemStoreSizing(); for (Segment segment : localCopy) { keySize += segment.keySize(); heapSize += segment.heapSize(); } - return new MemStoreSize(keySize, heapSize); + return new MemStoreSizing(keySize, heapSize); } private void swapSuffix(List suffix, ImmutableSegment segment, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index ef4c3cd4de3..93658193c1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -257,13 +257,13 @@ public class CompositeImmutableSegment extends ImmutableSegment { } @Override - protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) { + protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } @Override protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, - MemStoreSize memstoreSize) { + MemStoreSizing memstoreSizing) { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 4ce6e5b1a0e..b1a87beab2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -195,26 +195,26 @@ public class DefaultMemStore extends AbstractMemStore { byte [] fam = Bytes.toBytes("col"); byte [] qf = Bytes.toBytes("umn"); byte [] empty = new byte[0]; - MemStoreSize memstoreSize = new MemStoreSize(); + MemStoreSizing memstoreSizing = new MemStoreSizing(); for (int i = 0; i < count; i++) { // Give each its own ts - memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize); + memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing); } LOG.info("memstore1 estimated size=" - + (memstoreSize.getDataSize() + memstoreSize.getHeapSize())); + + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize())); for (int i = 0; i < count; i++) { - memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize); + memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing); } LOG.info("memstore1 estimated size (2nd loading of same data)=" - + (memstoreSize.getDataSize() + memstoreSize.getHeapSize())); + + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize())); // Make a variably sized memstore. DefaultMemStore memstore2 = new DefaultMemStore(); - memstoreSize = new MemStoreSize(); + memstoreSizing = new MemStoreSizing(); for (int i = 0; i < count; i++) { - memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSize); + memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSizing); } LOG.info("memstore2 estimated size=" - + (memstoreSize.getDataSize() + memstoreSize.getHeapSize())); + + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize())); final int seconds = 30; LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); LOG.info("Exiting."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 99f5c355730..e03e4dd0262 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -537,11 +537,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final long startTime; final long flushOpSeqId; final long flushedSeqId; - final MemStoreSize totalFlushableSize; + final MemStoreSizing totalFlushableSize; /** Constructs an early exit case */ PrepareFlushResult(FlushResultImpl result, long flushSeqId) { - this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemStoreSize()); + this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, MemStoreSizing.DUD); } /** Constructs a successful prepare flush result */ @@ -549,7 +549,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi TreeMap storeFlushCtxs, TreeMap> committedFiles, TreeMap storeFlushableSize, long startTime, long flushSeqId, - long flushedSeqId, MemStoreSize totalFlushableSize) { + long flushedSeqId, MemStoreSizing totalFlushableSize) { this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushSeqId, flushedSeqId, totalFlushableSize); } @@ -559,7 +559,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi TreeMap storeFlushCtxs, TreeMap> committedFiles, TreeMap storeFlushableSize, long startTime, long flushSeqId, - long flushedSeqId, MemStoreSize totalFlushableSize) { + long flushedSeqId, MemStoreSizing totalFlushableSize) { this.result = result; this.storeFlushCtxs = storeFlushCtxs; this.committedFiles = committedFiles; @@ -1711,7 +1711,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.closed.set(true); if (!canFlush) { - this.decrMemStoreSize(new MemStoreSize(memstoreDataSize.get(), getMemStoreHeapSize())); + this.decrMemStoreSize(new MemStoreSizing(memstoreDataSize.get(), getMemStoreHeapSize())); } else if (memstoreDataSize.get() != 0) { LOG.error("Memstore size is " + memstoreDataSize.get()); } @@ -2502,7 +2502,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName()); - MemStoreSize totalSizeOfFlushableStores = new MemStoreSize(); + MemStoreSizing totalSizeOfFlushableStores = new MemStoreSizing(); Map flushedFamilyNamesToSeq = new HashMap<>(); for (HStore store : storesToFlush) { @@ -2546,7 +2546,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (HStore s : storesToFlush) { MemStoreSize flushableSize = s.getFlushableSize(); totalSizeOfFlushableStores.incMemStoreSize(flushableSize); - storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), s.createFlushContext(flushOpSeqId)); + storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), + s.createFlushContext(flushOpSeqId)); committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); // for writing stores to WAL storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), flushableSize); } @@ -3323,7 +3324,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int cellCount = 0; /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); - MemStoreSize memStoreSize = new MemStoreSize(); + MemStoreSizing memStoreAccounting = new MemStoreSizing(); try { // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. int numReadyToWrite = 0; @@ -3506,11 +3507,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.updateSequenceId(batchOp.familyCellMaps[i].values(), replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); } - applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreSize); + applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreAccounting); } // update memstore size - this.addAndGetMemStoreSize(memStoreSize); + this.addAndGetMemStoreSize(memStoreAccounting); // calling the post CP hook for batch mutation if (!replay && coprocessorHost != null) { @@ -3983,12 +3984,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param memstoreSize */ private void applyFamilyMapToMemStore(Map> familyMap, - MemStoreSize memstoreSize) throws IOException { + MemStoreSizing memstoreAccounting) throws IOException { for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); List cells = e.getValue(); assert cells instanceof RandomAccess; - applyToMemStore(getStore(family), cells, false, memstoreSize); + applyToMemStore(getStore(family), cells, false, memstoreAccounting); } } @@ -3996,30 +3997,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be * set; when set we will run operations that make sense in the increment/append scenario * but that do not make sense otherwise. - * @see #applyToMemStore(HStore, Cell, MemStoreSize) + * @see #applyToMemStore(HStore, Cell, MemStoreSizing) */ private void applyToMemStore(HStore store, List cells, boolean delta, - MemStoreSize memstoreSize) throws IOException { + MemStoreSizing memstoreAccounting) throws IOException { // Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!! boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1; if (upsert) { - store.upsert(cells, getSmallestReadPoint(), memstoreSize); + store.upsert(cells, getSmallestReadPoint(), memstoreAccounting); } else { - store.add(cells, memstoreSize); + store.add(cells, memstoreAccounting); } } /** - * @see #applyToMemStore(HStore, List, boolean, MemStoreSize) + * @see #applyToMemStore(HStore, List, boolean, MemStoreSizing) */ - private void applyToMemStore(HStore store, Cell cell, MemStoreSize memstoreSize) + private void applyToMemStore(HStore store, Cell cell, MemStoreSizing memstoreAccounting) throws IOException { // Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!! if (store == null) { checkFamily(CellUtil.cloneFamily(cell)); // Unreachable because checkFamily will throw exception } - store.add(cell, memstoreSize); + store.add(cell, memstoreAccounting); } /** @@ -4347,7 +4348,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } boolean flush = false; - MemStoreSize memstoreSize = new MemStoreSize(); + MemStoreSizing memstoreSize = new MemStoreSizing(); for (Cell cell: val.getCells()) { // Check this edit is for me. Also, guard against writing the special // METACOLUMN info such as HBASE::CACHEFLUSH entries @@ -4843,7 +4844,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException */ private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException { - MemStoreSize totalFreedSize = new MemStoreSize(); + MemStoreSizing totalFreedSize = new MemStoreSizing(); this.updatesLock.writeLock().lock(); try { @@ -5271,15 +5272,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Used by tests * @param s Store to add edit too. * @param cell Cell to add. - * @param memstoreSize */ @VisibleForTesting - protected void restoreEdit(HStore s, Cell cell, MemStoreSize memstoreSize) { - s.add(cell, memstoreSize); + protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreAccounting) { + s.add(cell, memstoreAccounting); } /** - * @param fs * @param p File to check. * @return True if file was zero-length (and if so, we'll delete it in here). * @throws IOException @@ -7114,7 +7113,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // This is assigned by mvcc either explicity in the below or in the guts of the WAL append // when it assigns the edit a sequencedid (A.K.A the mvcc write number). WriteEntry writeEntry = null; - MemStoreSize memstoreSize = new MemStoreSize(); + MemStoreSizing memstoreAccounting = new MemStoreSizing(); try { boolean success = false; try { @@ -7158,7 +7157,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If no WAL, need to stamp it here. CellUtil.setSequenceId(cell, sequenceId); } - applyToMemStore(getStore(cell), cell, memstoreSize); + applyToMemStore(getStore(cell), cell, memstoreAccounting); } } @@ -7194,7 +7193,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { closeRegionOperation(); if (!mutations.isEmpty()) { - long newSize = this.addAndGetMemStoreSize(memstoreSize); + long newSize = this.addAndGetMemStoreSize(memstoreAccounting); requestFlushIfNeeded(newSize); } } @@ -7298,7 +7297,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(op); List results = returnResults? new ArrayList<>(mutation.size()): null; RowLock rowLock = null; - MemStoreSize memstoreSize = new MemStoreSize(); + MemStoreSizing memstoreAccounting = new MemStoreSizing(); try { rowLock = getRowLockInternal(mutation.getRow(), false); lock(this.updatesLock.readLock()); @@ -7324,7 +7323,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Now write to MemStore. Do it a column family at a time. for (Map.Entry> e : forMemStore.entrySet()) { - applyToMemStore(e.getKey(), e.getValue(), true, memstoreSize); + applyToMemStore(e.getKey(), e.getValue(), true, memstoreAccounting); } mvcc.completeAndWait(writeEntry); if (rsServices != null && rsServices.getNonceManager() != null) { @@ -7347,7 +7346,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowLock.release(); } // Request a cache flush if over the limit. Do it outside update lock. - if (isFlushSize(addAndGetMemStoreSize(memstoreSize))) { + if (isFlushSize(addAndGetMemStoreSize(memstoreAccounting))) { requestFlush(); } closeRegionOperation(op); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 2ec5437ebf7..e3d6724b3de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -682,13 +682,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat /** * Adds a value to the memstore - * @param cell - * @param memstoreSize */ - public void add(final Cell cell, MemStoreSize memstoreSize) { + public void add(final Cell cell, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { - this.memstore.add(cell, memstoreSize); + this.memstore.add(cell, memstoreSizing); } finally { lock.readLock().unlock(); } @@ -696,13 +694,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat /** * Adds the specified value to the memstore - * @param cells - * @param memstoreSize */ - public void add(final Iterable cells, MemStoreSize memstoreSize) { + public void add(final Iterable cells, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { - memstore.add(cells, memstoreSize); + memstore.add(cells, memstoreSizing); } finally { lock.readLock().unlock(); } @@ -2143,16 +2139,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat *

* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic * across all of them. - * @param cells * @param readpoint readpoint below which we can safely remove duplicate KVs - * @param memstoreSize * @throws IOException */ - public void upsert(Iterable cells, long readpoint, MemStoreSize memstoreSize) + public void upsert(Iterable cells, long readpoint, MemStoreSizing memstoreSizing) throws IOException { this.lock.readLock().lock(); try { - this.memstore.upsert(cells, readpoint, memstoreSize); + this.memstore.upsert(cells, readpoint, memstoreSizing); } finally { this.lock.readLock().unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 324afbe220c..af7e5d570c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -68,18 +68,18 @@ public interface MemStore { /** * Write an update * @param cell - * @param memstoreSize The delta in memstore size will be passed back via this. + * @param memstoreSizing The delta in memstore size will be passed back via this. * This will include both data size and heap overhead delta. */ - void add(final Cell cell, MemStoreSize memstoreSize); + void add(final Cell cell, MemStoreSizing memstoreSizing); /** * Write the updates * @param cells - * @param memstoreSize The delta in memstore size will be passed back via this. + * @param memstoreSizing The delta in memstore size will be passed back via this. * This will include both data size and heap overhead delta. */ - void add(Iterable cells, MemStoreSize memstoreSize); + void add(Iterable cells, MemStoreSizing memstoreSizing); /** * @return Oldest timestamp of all the Cells in the MemStore @@ -99,10 +99,10 @@ public interface MemStore { * only see each KeyValue update as atomic. * @param cells * @param readpoint readpoint below which we can safely remove duplicate Cells. - * @param memstoreSize The delta in memstore size will be passed back via this. + * @param memstoreSizing The delta in memstore size will be passed back via this. * This will include both data size and heap overhead delta. */ - void upsert(Iterable cells, long readpoint, MemStoreSize memstoreSize); + void upsert(Iterable cells, long readpoint, MemStoreSizing memstoreSizing); /** * @return scanner over the memstore. This might include scanner over the snapshot when one is diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java index a588b2049ef..cf2ef6fe600 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,71 +17,47 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; /** - * Wraps the data size part and total heap space occupied by the memstore. + * Reports the data size part and total heap space occupied by the MemStore. + * Read-only. + * @see MemStoreSizing */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) public class MemStoreSize { + /** + *'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can + * be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap + * or off heap LABs + */ + protected long dataSize; - // 'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can - // be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap - // or off heap LABs - private long dataSize; - // 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead. - // When Cells in on heap area, this will include the cells data size as well. - private long heapSize; - final private boolean isEmpty; + /** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead. + * When Cells in on heap area, this will include the cells data size as well. + */ + protected long heapSize; public MemStoreSize() { - dataSize = 0; - heapSize = 0; - isEmpty = false; - } - - public MemStoreSize(boolean isEmpty) { - dataSize = 0; - heapSize = 0; - this.isEmpty = isEmpty; - } - - public boolean isEmpty() { - return isEmpty; + this(0L, 0L); } public MemStoreSize(long dataSize, long heapSize) { this.dataSize = dataSize; this.heapSize = heapSize; - this.isEmpty = false; } - public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) { - this.dataSize += dataSizeDelta; - this.heapSize += heapSizeDelta; - } - - public void incMemStoreSize(MemStoreSize delta) { - this.dataSize += delta.dataSize; - this.heapSize += delta.heapSize; - } - - public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) { - this.dataSize -= dataSizeDelta; - this.heapSize -= heapSizeDelta; - } - - public void decMemStoreSize(MemStoreSize delta) { - this.dataSize -= delta.dataSize; - this.heapSize -= delta.heapSize; + public boolean isEmpty() { + return this.dataSize == 0 && this.heapSize == 0; } public long getDataSize() { - return isEmpty ? 0 : dataSize; + return this.dataSize; } public long getHeapSize() { - return isEmpty ? 0 : heapSize; + return this.heapSize; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java new file mode 100644 index 00000000000..fade6222513 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Accounting of current heap and data sizes. + * Allows read/write on data/heap size as opposed to {@Link MemStoreSize} which is read-only. + * For internal use. + * @see MemStoreSize + */ +@InterfaceAudience.Private +public class MemStoreSizing extends MemStoreSize { + public static final MemStoreSizing DUD = new MemStoreSizing() { + @Override + public void incMemStoreSize(MemStoreSize delta) { + incMemStoreSize(delta.getDataSize(), delta.getHeapSize()); + } + + @Override + public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) { + throw new RuntimeException("I'm a dud, you can't use me!"); + } + + @Override + public void decMemStoreSize(MemStoreSize delta) { + decMemStoreSize(delta.getDataSize(), delta.getHeapSize()); + } + + @Override + public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) { + throw new RuntimeException("I'm a dud, you can't use me!"); + } + }; + + public MemStoreSizing() { + super(); + } + + public MemStoreSizing(long dataSize, long heapSize) { + super(dataSize, heapSize); + } + + public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) { + this.dataSize += dataSizeDelta; + this.heapSize += heapSizeDelta; + } + + public void incMemStoreSize(MemStoreSize delta) { + incMemStoreSize(delta.getDataSize(), delta.getHeapSize()); + } + + public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) { + this.dataSize -= dataSizeDelta; + this.heapSize -= heapSizeDelta; + } + + public void decMemStoreSize(MemStoreSize delta) { + decMemStoreSize(delta.getDataSize(), delta.getHeapSize()); + } + + public void empty() { + this.dataSize = 0L; + this.heapSize = 0L; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof MemStoreSizing)) { + return false; + } + MemStoreSizing other = (MemStoreSizing) obj; + return this.dataSize == other.dataSize && this.heapSize == other.heapSize; + } + + @Override + public int hashCode() { + long h = 13 * this.dataSize; + h = h + 14 * this.heapSize; + return (int) h; + } + + @Override + public String toString() { + return "dataSize=" + this.dataSize + " , heapSize=" + this.heapSize; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java index c04e41e0a8f..56a97e08355 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.wal.WALEdit; @@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.wal.WALEdit; * org.apache.hadoop.hbase.coprocessor.ObserverContext, MiniBatchOperationInProgress) * @param T Pair<Mutation, Integer> pair of Mutations and associated rowlock ids . */ -@InterfaceAudience.LimitedPrivate("Coprocessors") +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) public class MiniBatchOperationInProgress { private final T[] operations; private Mutation[][] operationsFromCoprocessors; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index b1dc748f587..68c0fa15855 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; @@ -51,14 +50,13 @@ public class MutableSegment extends Segment { * Adds the given cell into the segment * @param cell the cell to add * @param mslabUsed whether using MSLAB - * @param memstoreSize */ - public void add(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) { - internalAdd(cell, mslabUsed, memstoreSize); + public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing) { + internalAdd(cell, mslabUsed, memStoreSizing); } - public void upsert(Cell cell, long readpoint, MemStoreSize memstoreSize) { - internalAdd(cell, false, memstoreSize); + public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing) { + internalAdd(cell, false, memStoreSizing); // Get the Cells for the row/family/qualifier regardless of timestamp. // For this case we want to clean up any other puts @@ -90,8 +88,8 @@ public class MutableSegment extends Segment { int cellLen = getCellLength(cur); long heapSize = heapSizeChange(cur, true); this.incSize(-cellLen, -heapSize); - if (memstoreSize != null) { - memstoreSize.decMemStoreSize(cellLen, heapSize); + if (memStoreSizing != null) { + memStoreSizing.decMemStoreSize(cellLen, heapSize); } it.remove(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index 8d1656f85b7..7689fcd61bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.Pair; */ @InterfaceAudience.Private public class RegionServerAccounting { - // memstore data size private final LongAdder globalMemstoreDataSize = new LongAdder(); // memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell @@ -46,7 +45,7 @@ public class RegionServerAccounting { // Store the edits size during replaying WAL. Use this to roll back the // global memstore size once a region opening failed. - private final ConcurrentMap replayEditsPerRegion = + private final ConcurrentMap replayEditsPerRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); private long globalMemStoreLimit; @@ -216,14 +215,14 @@ public class RegionServerAccounting { * @param memStoreSize the Memstore size will be added to replayEditsPerRegion. */ public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) { - MemStoreSize replayEdistsSize = replayEditsPerRegion.get(regionName); + MemStoreSizing replayEdistsSize = replayEditsPerRegion.get(regionName); // All ops on the same MemStoreSize object is going to be done by single thread, sequentially // only. First calls to this method to increment the per region reply edits size and then call // to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of // the region open operation. No need to handle multi thread issues on one region's entry in // this Map. if (replayEdistsSize == null) { - replayEdistsSize = new MemStoreSize(); + replayEdistsSize = new MemStoreSizing(); replayEditsPerRegion.put(regionName, replayEdistsSize); } replayEdistsSize.incMemStoreSize(memStoreSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 03145f164bf..6f41f86767d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -277,13 +277,13 @@ public abstract class Segment { return comparator; } - protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) { + protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) { boolean succ = getCellSet().add(cell); - updateMetaInfo(cell, succ, mslabUsed, memstoreSize); + updateMetaInfo(cell, succ, mslabUsed, memstoreSizing); } protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, - MemStoreSize memstoreSize) { + MemStoreSizing memstoreSizing) { long cellSize = 0; // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger @@ -293,8 +293,8 @@ public abstract class Segment { } long heapSize = heapSizeChange(cellToAdd, succ); incSize(cellSize, heapSize); - if (memstoreSize != null) { - memstoreSize.incMemStoreSize(cellSize, heapSize); + if (memstoreSizing != null) { + memstoreSizing.incMemStoreSize(cellSize, heapSize); } getTimeRangeTracker().includeTimestamp(cellToAdd); minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId()); @@ -307,8 +307,8 @@ public abstract class Segment { } } - protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSize memstoreSize) { - updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSize); + protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSizing memstoreSizing) { + updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 63d1baae221..43836f48525 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -95,17 +95,18 @@ public final class SegmentFactory { // create flat immutable segment from non-flat immutable segment // for flattening public ImmutableSegment createImmutableSegmentByFlattening( - CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, MemStoreSize memstoreSize) { + CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, + MemStoreSizing memstoreSizing) { ImmutableSegment res = null; switch (idxType) { case CHUNK_MAP: - res = new CellChunkImmutableSegment(segment, memstoreSize); + res = new CellChunkImmutableSegment(segment, memstoreSizing); break; case CSLM_MAP: assert false; // non-flat segment can not be the result of flattening break; case ARRAY_MAP: - res = new CellArrayImmutableSegment(segment, memstoreSize); + res = new CellArrayImmutableSegment(segment, memstoreSizing); break; } return res; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java index 12b94b58a97..ce0075e0383 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java @@ -607,18 +607,18 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) { byte[] fam = Bytes.toBytes("testfamily"); byte[] qf = Bytes.toBytes("testqualifier"); - MemStoreSize memstoreSize = new MemStoreSize(); + MemStoreSizing memstoreSizing = new MemStoreSizing(); for (int i = 0; i < keys.length; i++) { long timestamp = System.currentTimeMillis(); Threads.sleep(1); // to make sure each kv gets a different ts byte[] row = Bytes.toBytes(keys[i]); byte[] val = Bytes.toBytes(keys[i] + i); KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); - hmc.add(kv, memstoreSize); + hmc.add(kv, memstoreSizing); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); } - regionServicesForStores.addMemStoreSize(memstoreSize); - return memstoreSize.getDataSize(); + regionServicesForStores.addMemStoreSize(memstoreSizing); + return memstoreSizing.getDataSize(); } private long cellBeforeFlushSize() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 0a974ae21d8..6197a082ccb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -126,9 +126,9 @@ public class TestDefaultMemStore { public void testPutSameCell() { byte[] bytes = Bytes.toBytes(getName()); KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes); - MemStoreSize sizeChangeForFirstCell = new MemStoreSize(); + MemStoreSizing sizeChangeForFirstCell = new MemStoreSizing(); this.memstore.add(kv, sizeChangeForFirstCell); - MemStoreSize sizeChangeForSecondCell = new MemStoreSize(); + MemStoreSizing sizeChangeForSecondCell = new MemStoreSizing(); this.memstore.add(kv, sizeChangeForSecondCell); // make sure memstore size increase won't double-count MSLAB chunk size assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getDataSize()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index eeed73c588d..b9054f4b53d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -230,7 +230,7 @@ public class TestHStore { * @throws Exception */ @Test - public void testFlushSizeAccounting() throws Exception { + public void testFlushSizeSizing() throws Exception { LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); final Configuration conf = HBaseConfiguration.create(); @@ -254,7 +254,7 @@ public class TestHStore { MemStoreSize size = store.memstore.getFlushableSize(); assertEquals(0, size.getDataSize()); LOG.info("Adding some data"); - MemStoreSize kvSize = new MemStoreSize(); + MemStoreSizing kvSize = new MemStoreSizing(); store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); // add the heap size of active (mutable) segment kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD); @@ -273,7 +273,7 @@ public class TestHStore { CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD); size = store.memstore.getFlushableSize(); assertEquals(kvSize, size); - MemStoreSize kvSize2 = new MemStoreSize(); + MemStoreSizing kvSize2 = new MemStoreSizing(); store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2); kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD); // Even though we add a new kv, we expect the flushable size to be 'same' since we have @@ -1217,7 +1217,7 @@ public class TestHStore { byte[] value0 = Bytes.toBytes("value0"); byte[] value1 = Bytes.toBytes("value1"); byte[] value2 = Bytes.toBytes("value2"); - MemStoreSize memStoreSize = new MemStoreSize(); + MemStoreSizing memStoreSizing = new MemStoreSizing(); long ts = EnvironmentEdgeManager.currentTime(); long seqId = 100; init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), @@ -1229,18 +1229,18 @@ public class TestHStore { } }); // The cells having the value0 won't be flushed to disk because the value of max version is 1 - store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSize); - store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSize); - store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSize); - store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSize); - store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSize); - store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSize); - store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSize); - store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSize); - store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSize); - store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSize); - store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSize); - store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSize); + store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); + store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); + store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); + store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); + store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); + store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); + store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); + store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); + store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); + store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); List myList = new MyList<>(hook); Scan scan = new Scan() .withStartRow(r1) @@ -1276,13 +1276,13 @@ public class TestHStore { init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); byte[] value = Bytes.toBytes("value"); - MemStoreSize memStoreSize = new MemStoreSize(); + MemStoreSizing memStoreSizing = new MemStoreSizing(); long ts = EnvironmentEdgeManager.currentTime(); long seqId = 100; // older data whihc shouldn't be "seen" by client - store.add(createCell(qf1, ts, seqId, value), memStoreSize); - store.add(createCell(qf2, ts, seqId, value), memStoreSize); - store.add(createCell(qf3, ts, seqId, value), memStoreSize); + store.add(createCell(qf1, ts, seqId, value), memStoreSizing); + store.add(createCell(qf2, ts, seqId, value), memStoreSizing); + store.add(createCell(qf3, ts, seqId, value), memStoreSizing); TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); quals.add(qf1); quals.add(qf2); @@ -1354,22 +1354,22 @@ public class TestHStore { }); byte[] oldValue = Bytes.toBytes("oldValue"); byte[] currentValue = Bytes.toBytes("currentValue"); - MemStoreSize memStoreSize = new MemStoreSize(); + MemStoreSizing memStoreSizing = new MemStoreSizing(); long ts = EnvironmentEdgeManager.currentTime(); long seqId = 100; // older data whihc shouldn't be "seen" by client - myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSize); - myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSize); - myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSize); + myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); + myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); + myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); long snapshotId = id++; // push older data into snapshot -- phase (1/4) StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId); storeFlushCtx.prepare(); // insert current data into active -- phase (2/4) - myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSize); - myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSize); - myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSize); + myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); + myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); + myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); quals.add(qf1); quals.add(qf2); @@ -1467,21 +1467,21 @@ public class TestHStore { init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); byte[] value = Bytes.toBytes("thisisavarylargevalue"); - MemStoreSize memStoreSize = new MemStoreSize(); + MemStoreSizing memStoreSizing = new MemStoreSizing(); long ts = EnvironmentEdgeManager.currentTime(); long seqId = 100; // older data whihc shouldn't be "seen" by client - store.add(createCell(qf1, ts, seqId, value), memStoreSize); - store.add(createCell(qf2, ts, seqId, value), memStoreSize); - store.add(createCell(qf3, ts, seqId, value), memStoreSize); + store.add(createCell(qf1, ts, seqId, value), memStoreSizing); + store.add(createCell(qf2, ts, seqId, value), memStoreSizing); + store.add(createCell(qf3, ts, seqId, value), memStoreSizing); assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); StoreFlushContext storeFlushCtx = store.createFlushContext(id++); storeFlushCtx.prepare(); // This shouldn't invoke another in-memory flush because the first compactor thread // hasn't accomplished the in-memory compaction. - store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); - store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); - store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); + store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); + store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); + store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); //okay. Let the compaction be completed MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); @@ -1490,9 +1490,9 @@ public class TestHStore { TimeUnit.SECONDS.sleep(1); } // This should invoke another in-memory flush. - store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); - store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); - store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); + store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); + store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); + store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); @@ -1589,25 +1589,25 @@ public class TestHStore { conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); // Set the lower threshold to invoke the "MERGE" policy MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); - MemStoreSize memStoreSize = new MemStoreSize(); + MemStoreSizing memStoreSizing = new MemStoreSizing(); long ts = System.currentTimeMillis(); long seqID = 1l; // Add some data to the region and do some flushes for (int i = 1; i < 10; i++) { store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), - memStoreSize); + memStoreSizing); } // flush them flushStore(store, seqID); for (int i = 11; i < 20; i++) { store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), - memStoreSize); + memStoreSizing); } // flush them flushStore(store, seqID); for (int i = 21; i < 30; i++) { store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), - memStoreSize); + memStoreSizing); } // flush them flushStore(store, seqID); @@ -1624,14 +1624,14 @@ public class TestHStore { // create more store files for (int i = 31; i < 40; i++) { store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), - memStoreSize); + memStoreSizing); } // flush them flushStore(store, seqID); for (int i = 41; i < 50; i++) { store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), - memStoreSize); + memStoreSizing); } // flush them flushStore(store, seqID); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 63c904d1678..d23dd1a97a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -77,8 +77,8 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.MemStoreSizing; import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; -import org.apache.hadoop.hbase.regionserver.MemStoreSize; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -543,8 +543,8 @@ public abstract class AbstractTestWALReplay { final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { @Override - protected void restoreEdit(HStore s, Cell cell, MemStoreSize memstoreSize) { - super.restoreEdit(s, cell, memstoreSize); + protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) { + super.restoreEdit(s, cell, memstoreSizing); countOfRestoredEdits.incrementAndGet(); } };