From 77a6bf3b33482ba7cee2aa7c5b57b9d7b663f89f Mon Sep 17 00:00:00 2001 From: Allan Yang Date: Wed, 22 Aug 2018 23:19:11 +0800 Subject: [PATCH] HBASE-21041 Memstore's heap size will be decreased to minus zero after flush --- .../hbase/regionserver/AbstractMemStore.java | 18 ++++++-- .../regionserver/CSLMImmutableSegment.java | 5 ++- .../regionserver/CompactingMemStore.java | 5 +-- .../regionserver/CompactionPipeline.java | 9 +++- .../hbase/regionserver/DefaultMemStore.java | 23 ++++++++-- .../hadoop/hbase/regionserver/HStore.java | 3 +- .../hbase/regionserver/MutableSegment.java | 6 ++- .../hbase/regionserver/SegmentFactory.java | 28 +++++++----- .../hbase/regionserver/TestHRegion.java | 30 +++++++++++++ .../TestHRegionWithInMemoryFlush.java | 45 +++++++++++++++++++ 10 files changed, 149 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index a7a1af88ff1..e3599254752 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -54,8 +54,10 @@ public abstract class AbstractMemStore implements MemStore { // Used to track when to flush private volatile long timeOfOldestEdit; + protected RegionServicesForStores regionServices; + public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT - + (4 * ClassSize.REFERENCE) + + (5 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; @@ -72,17 +74,27 @@ public abstract class AbstractMemStore implements MemStore { scanners.add(segment.getScanner(readPt)); } - protected AbstractMemStore(final Configuration conf, final CellComparator c) { + protected AbstractMemStore(final Configuration conf, final CellComparator c, + final RegionServicesForStores regionServices) { this.conf = conf; this.comparator = c; + this.regionServices = regionServices; resetActive(); this.snapshot = SegmentFactory.instance().createImmutableSegment(c); this.snapshotId = NO_SNAPSHOT_ID; } protected void resetActive() { + // Record the MutableSegment' heap overhead when initialing + MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); // Reset heap to not include any keys - active = SegmentFactory.instance().createMutableSegment(conf, comparator); + active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting); + // regionServices can be null when testing + if (regionServices != null) { + regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), + memstoreAccounting.getHeapSize(), + memstoreAccounting.getOffHeapSize()); + } timeOfOldestEdit = Long.MAX_VALUE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java index 9e206ea67e1..855fd084d2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java @@ -36,11 +36,14 @@ public class CSLMImmutableSegment extends ImmutableSegment { * This C-tor should be used when active MutableSegment is pushed into the compaction * pipeline and becomes an ImmutableSegment. */ - protected CSLMImmutableSegment(Segment segment) { + protected CSLMImmutableSegment(Segment segment, MemStoreSizing memstoreSizing) { super(segment); // update the segment metadata heap size long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM; incMemStoreSize(0, indexOverhead, 0); // CSLM is always on-heap + if (memstoreSizing != null) { + memstoreSizing.incMemStoreSize(0, indexOverhead, 0); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 2eb05b463f1..00d5273f914 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -66,7 +66,6 @@ public class CompactingMemStore extends AbstractMemStore { private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); private HStore store; - private RegionServicesForStores regionServices; private CompactionPipeline pipeline; protected MemStoreCompactor compactor; @@ -93,7 +92,7 @@ public class CompactingMemStore extends AbstractMemStore { private IndexType indexType = IndexType.ARRAY_MAP; // default implementation public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD - + 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, + + 6 * ClassSize.REFERENCE // Store, CompactionPipeline, // MemStoreCompactor, inMemoryCompactionInProgress, // allowCompaction, indexType + Bytes.SIZEOF_LONG // inmemoryFlushSize @@ -104,7 +103,7 @@ public class CompactingMemStore extends AbstractMemStore { public CompactingMemStore(Configuration conf, CellComparator c, HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) throws IOException { - super(conf, c); + super(conf, c, regionServices); this.store = store; this.regionServices = regionServices; this.pipeline = new CompactionPipeline(getRegionServices()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 1131c3c98d4..216f7c3b89d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -72,8 +72,15 @@ public class CompactionPipeline { } public boolean pushHead(MutableSegment segment) { + // Record the ImmutableSegment' heap overhead when initialing + MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); ImmutableSegment immutableSegment = SegmentFactory.instance(). - createImmutableSegment(segment); + createImmutableSegment(segment, memstoreAccounting); + if (region != null) { + region.addMemStoreSize(memstoreAccounting.getDataSize(), + memstoreAccounting.getHeapSize(), + memstoreAccounting.getOffHeapSize()); + } synchronized (pipeline){ boolean res = addFirst(immutableSegment); readOnlyCopy = new LinkedList<>(pipeline); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 97170fbeab7..a006ecbe80a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -63,7 +63,7 @@ public class DefaultMemStore extends AbstractMemStore { * Default constructor. Used for tests. */ public DefaultMemStore() { - this(HBaseConfiguration.create(), CellComparator.getInstance()); + this(HBaseConfiguration.create(), CellComparator.getInstance(), null); } /** @@ -71,7 +71,16 @@ public class DefaultMemStore extends AbstractMemStore { * @param c Comparator */ public DefaultMemStore(final Configuration conf, final CellComparator c) { - super(conf, c); + super(conf, c, null); + } + + /** + * Constructor. + * @param c Comparator + */ + public DefaultMemStore(final Configuration conf, final CellComparator c, + final RegionServicesForStores regionServices) { + super(conf, c, regionServices); } /** @@ -88,8 +97,16 @@ public class DefaultMemStore extends AbstractMemStore { } else { this.snapshotId = EnvironmentEdgeManager.currentTime(); if (!getActive().isEmpty()) { + // Record the ImmutableSegment' heap overhead when initialing + MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); ImmutableSegment immutableSegment = SegmentFactory.instance(). - createImmutableSegment(getActive()); + createImmutableSegment(getActive(), memstoreAccounting); + // regionServices can be null when testing + if (regionServices != null) { + regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), + memstoreAccounting.getHeapSize(), + memstoreAccounting.getOffHeapSize()); + } this.snapshot = immutableSegment; resetActive(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 73b992e79d0..1ff90430928 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -351,7 +351,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat switch (inMemoryCompaction) { case NONE: ms = ReflectionUtils.newInstance(DefaultMemStore.class, - new Object[]{conf, this.comparator}); + new Object[] { conf, this.comparator, + this.getHRegion().getRegionServicesForStores()}); break; default: Class clz = conf.getClass(MEMSTORE_CLASS_NAME, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index d76321d78a9..f482cbb4b7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -47,9 +47,13 @@ public class MutableSegment extends Segment { + ClassSize.REFERENCE + ClassSize.ATOMIC_BOOLEAN); - protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { + protected MutableSegment(CellSet cellSet, CellComparator comparator, + MemStoreLAB memStoreLAB, MemStoreSizing memstoreSizing) { super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC)); incMemStoreSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata + if (memstoreSizing != null) { + memstoreSizing.incMemStoreSize(0, DEEP_OVERHEAD, 0); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index db0b319cc75..26b7ecc1320 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -60,22 +60,30 @@ public final class SegmentFactory { conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType); } - // create empty immutable segment - // for initializations + /** + * create empty immutable segment for initializations + * This ImmutableSegment is used as a place holder for snapshot in Memstore. + * It won't flush later, So it is not necessary to record the initial size + * for it. + * @param comparator comparator + * @return ImmutableSegment + */ public ImmutableSegment createImmutableSegment(CellComparator comparator) { - MutableSegment segment = generateMutableSegment(null, comparator, null); - return createImmutableSegment(segment); + MutableSegment segment = generateMutableSegment(null, comparator, null, null); + return createImmutableSegment(segment, null); } // create not-flat immutable segment from mutable segment - public ImmutableSegment createImmutableSegment(MutableSegment segment) { - return new CSLMImmutableSegment(segment); + public ImmutableSegment createImmutableSegment(MutableSegment segment, + MemStoreSizing memstoreSizing) { + return new CSLMImmutableSegment(segment, memstoreSizing); } // create mutable segment - public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) { + public MutableSegment createMutableSegment(final Configuration conf, + CellComparator comparator, MemStoreSizing memstoreSizing) { MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf); - return generateMutableSegment(conf, comparator, memStoreLAB); + return generateMutableSegment(conf, comparator, memStoreLAB, memstoreSizing); } // create new flat immutable segment from merging old immutable segments @@ -135,10 +143,10 @@ public final class SegmentFactory { } private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator, - MemStoreLAB memStoreLAB) { + MemStoreLAB memStoreLAB, MemStoreSizing memstoreSizing) { // TBD use configuration to set type of segment CellSet set = new CellSet(comparator); - return new MutableSegment(set, comparator, memStoreLAB); + return new MutableSegment(set, comparator, memStoreLAB, memstoreSizing); } private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List segments) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index da07c7b7528..c5c0a346dcf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -423,6 +423,36 @@ public class TestHRegion { HBaseTestingUtility.closeRegionAndWAL(region); } + /** + * A test case of HBASE-21041 + * @throws Exception Exception + */ + @Test + public void testFlushAndMemstoreSizeCounting() throws Exception { + byte[] family = Bytes.toBytes("family"); + this.region = initHRegion(tableName, method, CONF, family); + final WALFactory wals = new WALFactory(CONF, method); + try { + for (byte[] row : HBaseTestingUtility.ROWS) { + Put put = new Put(row); + put.addColumn(family, family, row); + region.put(put); + } + region.flush(true); + // After flush, data size should be zero + assertEquals(0, region.getMemStoreDataSize()); + // After flush, a new active mutable segment is created, so the heap size + // should equal to MutableSegment.DEEP_OVERHEAD + assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize()); + // After flush, offheap should be zero + assertEquals(0, region.getMemStoreOffHeapSize()); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + wals.close(); + } + } + /** * Test we do not lose data if we fail a flush and then close. * Part of HBase-10466. Tests the following from the issue description: diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java index 84f79733c5c..ca7629b9366 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java @@ -22,12 +22,18 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.Assert; import org.junit.ClassRule; +import org.junit.Test; import org.junit.experimental.categories.Category; + /** * A test similar to TestHRegion, but with in-memory flush families. * Also checks wal truncation after in-memory compaction. @@ -53,5 +59,44 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion { return TEST_UTIL.createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, isReadOnly, durability, wal, inMemory, families); } + + + /** + * A test case of HBASE-21041 + * @throws Exception Exception + */ + @Override + @Test + public void testFlushAndMemstoreSizeCounting() throws Exception { + byte[] family = Bytes.toBytes("family"); + this.region = initHRegion(tableName, method, CONF, family); + final WALFactory wals = new WALFactory(CONF, method); + int count = 0; + try { + for (byte[] row : HBaseTestingUtility.ROWS) { + Put put = new Put(row); + put.addColumn(family, family, row); + region.put(put); + //In memory flush every 1000 puts + if (count++ % 1000 == 0) { + ((CompactingMemStore) (region.getStore(family).memstore)) + .flushInMemory(); + } + } + region.flush(true); + // After flush, data size should be zero + Assert.assertEquals(0, region.getMemStoreDataSize()); + // After flush, a new active mutable segment is created, so the heap size + // should equal to MutableSegment.DEEP_OVERHEAD + Assert.assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize()); + // After flush, offheap size should be zero + Assert.assertEquals(0, region.getMemStoreOffHeapSize()); + + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + wals.close(); + } + } }