HBASE-21041 Memstore's heap size will be decreased to minus zero after flush

This commit is contained in:
Allan Yang 2018-08-22 22:54:14 +08:00
parent c07afa8875
commit 16ab716134
10 changed files with 149 additions and 23 deletions

View File

@ -54,8 +54,10 @@ public abstract class AbstractMemStore implements MemStore {
// Used to track when to flush
private volatile long timeOfOldestEdit;
protected RegionServicesForStores regionServices;
public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
+ (4 * ClassSize.REFERENCE)
+ (5 * ClassSize.REFERENCE)
+ (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
@ -72,17 +74,28 @@ public abstract class AbstractMemStore implements MemStore {
scanners.add(segment.getScanner(readPt));
}
protected AbstractMemStore(final Configuration conf, final CellComparator c) {
protected AbstractMemStore(final Configuration conf, final CellComparator c,
final RegionServicesForStores regionServices) {
this.conf = conf;
this.comparator = c;
this.regionServices = regionServices;
resetActive();
this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
this.snapshotId = NO_SNAPSHOT_ID;
}
protected void resetActive() {
// Record the MutableSegment' heap overhead when initialing
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
// Reset heap to not include any keys
this.active = SegmentFactory.instance().createMutableSegment(conf, comparator);
this.active = SegmentFactory.instance()
.createMutableSegment(conf, comparator, memstoreAccounting);
// regionServices can be null when testing
if (regionServices != null) {
regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
memstoreAccounting.getHeapSize(),
memstoreAccounting.getOffHeapSize());
}
this.timeOfOldestEdit = Long.MAX_VALUE;
}

View File

@ -36,11 +36,14 @@ public class CSLMImmutableSegment extends ImmutableSegment {
* This C-tor should be used when active MutableSegment is pushed into the compaction
* pipeline and becomes an ImmutableSegment.
*/
protected CSLMImmutableSegment(Segment segment) {
protected CSLMImmutableSegment(Segment segment, MemStoreSizing memstoreSizing) {
super(segment);
// update the segment metadata heap size
long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM;
incMemStoreSize(0, indexOverhead, 0); // CSLM is always on-heap
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(0, indexOverhead, 0);
}
}
@Override

View File

@ -66,7 +66,6 @@ public class CompactingMemStore extends AbstractMemStore {
private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
private HStore store;
private RegionServicesForStores regionServices;
private CompactionPipeline pipeline;
protected MemStoreCompactor compactor;
@ -93,7 +92,7 @@ public class CompactingMemStore extends AbstractMemStore {
private IndexType indexType = IndexType.ARRAY_MAP; // default implementation
public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
+ 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
+ 6 * ClassSize.REFERENCE // Store, CompactionPipeline,
// MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
// indexType
+ Bytes.SIZEOF_LONG // inmemoryFlushSize
@ -104,7 +103,7 @@ public class CompactingMemStore extends AbstractMemStore {
public CompactingMemStore(Configuration conf, CellComparator c,
HStore store, RegionServicesForStores regionServices,
MemoryCompactionPolicy compactionPolicy) throws IOException {
super(conf, c);
super(conf, c, regionServices);
this.store = store;
this.regionServices = regionServices;
this.pipeline = new CompactionPipeline(getRegionServices());

View File

@ -73,8 +73,15 @@ public class CompactionPipeline {
}
public boolean pushHead(MutableSegment segment) {
// Record the ImmutableSegment' heap overhead when initialing
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment immutableSegment = SegmentFactory.instance().
createImmutableSegment(segment);
createImmutableSegment(segment, memstoreAccounting);
if (region != null) {
region.addMemStoreSize(memstoreAccounting.getDataSize(),
memstoreAccounting.getHeapSize(),
memstoreAccounting.getOffHeapSize());
}
synchronized (pipeline){
boolean res = addFirst(immutableSegment);
readOnlyCopy = new LinkedList<>(pipeline);

View File

@ -63,7 +63,7 @@ public class DefaultMemStore extends AbstractMemStore {
* Default constructor. Used for tests.
*/
public DefaultMemStore() {
this(HBaseConfiguration.create(), CellComparator.getInstance());
this(HBaseConfiguration.create(), CellComparator.getInstance(), null);
}
/**
@ -71,7 +71,16 @@ public class DefaultMemStore extends AbstractMemStore {
* @param c Comparator
*/
public DefaultMemStore(final Configuration conf, final CellComparator c) {
super(conf, c);
super(conf, c, null);
}
/**
* Constructor.
* @param c Comparator
*/
public DefaultMemStore(final Configuration conf, final CellComparator c,
final RegionServicesForStores regionServices) {
super(conf, c, regionServices);
}
/**
@ -88,8 +97,16 @@ public class DefaultMemStore extends AbstractMemStore {
} else {
this.snapshotId = EnvironmentEdgeManager.currentTime();
if (!this.active.isEmpty()) {
// Record the ImmutableSegment' heap overhead when initialing
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment immutableSegment = SegmentFactory.instance().
createImmutableSegment(this.active);
createImmutableSegment(this.active, memstoreAccounting);
// regionServices can be null when testing
if (regionServices != null) {
regionServices.addMemStoreSize(memstoreAccounting.getDataSize(),
memstoreAccounting.getHeapSize(),
memstoreAccounting.getOffHeapSize());
}
this.snapshot = immutableSegment;
resetActive();
}

View File

@ -348,7 +348,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
switch (inMemoryCompaction) {
case NONE:
ms = ReflectionUtils.newInstance(DefaultMemStore.class,
new Object[]{conf, this.comparator});
new Object[] { conf, this.comparator,
this.getHRegion().getRegionServicesForStores()});
break;
default:
Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,

View File

@ -42,9 +42,13 @@ public class MutableSegment extends Segment {
+ ClassSize.CONCURRENT_SKIPLISTMAP
+ ClassSize.SYNC_TIMERANGE_TRACKER;
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
protected MutableSegment(CellSet cellSet, CellComparator comparator,
MemStoreLAB memStoreLAB, MemStoreSizing memstoreSizing) {
super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
incMemStoreSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(0, DEEP_OVERHEAD, 0);
}
}
/**

View File

@ -60,22 +60,30 @@ public final class SegmentFactory {
conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType);
}
// create empty immutable segment
// for initializations
/**
* create empty immutable segment for initializations
* This ImmutableSegment is used as a place holder for snapshot in Memstore.
* It won't flush later, So it is not necessary to record the initial size
* for it.
* @param comparator comparator
* @return ImmutableSegment
*/
public ImmutableSegment createImmutableSegment(CellComparator comparator) {
MutableSegment segment = generateMutableSegment(null, comparator, null);
return createImmutableSegment(segment);
MutableSegment segment = generateMutableSegment(null, comparator, null, null);
return createImmutableSegment(segment, null);
}
// create not-flat immutable segment from mutable segment
public ImmutableSegment createImmutableSegment(MutableSegment segment) {
return new CSLMImmutableSegment(segment);
public ImmutableSegment createImmutableSegment(MutableSegment segment,
MemStoreSizing memstoreSizing) {
return new CSLMImmutableSegment(segment, memstoreSizing);
}
// create mutable segment
public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) {
public MutableSegment createMutableSegment(final Configuration conf,
CellComparator comparator, MemStoreSizing memstoreSizing) {
MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
return generateMutableSegment(conf, comparator, memStoreLAB);
return generateMutableSegment(conf, comparator, memStoreLAB, memstoreSizing);
}
// create new flat immutable segment from merging old immutable segments
@ -135,10 +143,10 @@ public final class SegmentFactory {
}
private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator,
MemStoreLAB memStoreLAB) {
MemStoreLAB memStoreLAB, MemStoreSizing memstoreSizing) {
// TBD use configuration to set type of segment
CellSet set = new CellSet(comparator);
return new MutableSegment(set, comparator, memStoreLAB);
return new MutableSegment(set, comparator, memStoreLAB, memstoreSizing);
}
private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {

View File

@ -424,6 +424,36 @@ public class TestHRegion {
HBaseTestingUtility.closeRegionAndWAL(region);
}
/**
* A test case of HBASE-21041
* @throws Exception Exception
*/
@Test
public void testFlushAndMemstoreSizeCounting() throws Exception {
byte[] family = Bytes.toBytes("family");
this.region = initHRegion(tableName, method, CONF, family);
final WALFactory wals = new WALFactory(CONF, method);
try {
for (byte[] row : HBaseTestingUtility.ROWS) {
Put put = new Put(row);
put.addColumn(family, family, row);
region.put(put);
}
region.flush(true);
// After flush, data size should be zero
assertEquals(0, region.getMemStoreDataSize());
// After flush, a new active mutable segment is created, so the heap size
// should equal to MutableSegment.DEEP_OVERHEAD
assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
// After flush, offheap should be zero
assertEquals(0, region.getMemStoreOffHeapSize());
} finally {
HBaseTestingUtility.closeRegionAndWAL(this.region);
this.region = null;
wals.close();
}
}
/**
* Test we do not lose data if we fail a flush and then close.
* Part of HBase-10466. Tests the following from the issue description:

View File

@ -22,14 +22,20 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A test similar to TestHRegion, but with in-memory flush families.
* Also checks wal truncation after in-memory compaction.
@ -60,5 +66,43 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion {
return TEST_UTIL.createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey,
isReadOnly, durability, wal, inMemory, families);
}
/**
* A test case of HBASE-21041
* @throws Exception Exception
*/
@Override
@Test
public void testFlushAndMemstoreSizeCounting() throws Exception {
byte[] family = Bytes.toBytes("family");
this.region = initHRegion(tableName, method, CONF, family);
final WALFactory wals = new WALFactory(CONF, method);
int count = 0;
try {
for (byte[] row : HBaseTestingUtility.ROWS) {
Put put = new Put(row);
put.addColumn(family, family, row);
region.put(put);
//In memory flush every 1000 puts
if (count++ % 1000 == 0) {
((CompactingMemStore) (region.getStore(family).memstore))
.flushInMemory();
}
}
region.flush(true);
// After flush, data size should be zero
Assert.assertEquals(0, region.getMemStoreDataSize());
// After flush, a new active mutable segment is created, so the heap size
// should equal to MutableSegment.DEEP_OVERHEAD
Assert.assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
// After flush, offheap size should be zero
Assert.assertEquals(0, region.getMemStoreOffHeapSize());
} finally {
HBaseTestingUtility.closeRegionAndWAL(this.region);
this.region = null;
wals.close();
}
}
}