HBASE-26488 Memory leak when MemStore retry flushing (#3899)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
8fa8344a8c
commit
e20b2f3df5
|
@ -2504,7 +2504,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
long snapshotId = -1; // -1 means do not drop
|
long snapshotId = -1; // -1 means do not drop
|
||||||
if (dropMemstoreSnapshot && snapshot != null) {
|
if (dropMemstoreSnapshot && snapshot != null) {
|
||||||
snapshotId = snapshot.getId();
|
snapshotId = snapshot.getId();
|
||||||
snapshot.close();
|
|
||||||
}
|
}
|
||||||
HStore.this.updateStorefiles(storeFiles, snapshotId);
|
HStore.this.updateStorefiles(storeFiles, snapshotId);
|
||||||
}
|
}
|
||||||
|
@ -2515,10 +2514,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
@Override
|
@Override
|
||||||
public void abort() throws IOException {
|
public void abort() throws IOException {
|
||||||
if (snapshot != null) {
|
if (snapshot != null) {
|
||||||
//We need to close the snapshot when aborting, otherwise, the segment scanner
|
|
||||||
//won't be closed. If we are using MSLAB, the chunk referenced by those scanners
|
|
||||||
//can't be released, thus memory leak
|
|
||||||
snapshot.close();
|
|
||||||
HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
|
HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,6 +85,10 @@ public abstract class ImmutableSegment extends Segment {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We create a new {@link SnapshotSegmentScanner} to increase the reference count of
|
||||||
|
* {@link MemStoreLABImpl} used by this segment.
|
||||||
|
*/
|
||||||
List<KeyValueScanner> getSnapshotScanners() {
|
List<KeyValueScanner> getSnapshotScanners() {
|
||||||
return Collections.singletonList(new SnapshotSegmentScanner(this));
|
return Collections.singletonList(new SnapshotSegmentScanner(this));
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,31 +17,38 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.util.List;
|
|
||||||
/**
|
/**
|
||||||
* Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
|
* {@link MemStoreSnapshot} is a Context Object to hold details of the snapshot taken on a MemStore.
|
||||||
* count of cells in it and total memory size occupied by all the cells, timestamp information of
|
* Details include the snapshot's identifier, count of cells in it and total memory size occupied by
|
||||||
* all the cells and a scanner to read all cells in it.
|
* all the cells, timestamp information of all the cells and the snapshot immutableSegment.
|
||||||
|
* <p>
|
||||||
|
* NOTE:Every time when {@link MemStoreSnapshot#getScanners} is called, we create new
|
||||||
|
* {@link SnapshotSegmentScanner}s on the {@link MemStoreSnapshot#snapshotImmutableSegment},and
|
||||||
|
* {@link Segment#incScannerCount} is invoked in the {@link SnapshotSegmentScanner} ctor to increase
|
||||||
|
* the reference count of {@link MemStoreLAB} which used by
|
||||||
|
* {@link MemStoreSnapshot#snapshotImmutableSegment}, so after we finish using these scanners, we
|
||||||
|
* must call their close method to invoke {@link Segment#decScannerCount}.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class MemStoreSnapshot implements Closeable {
|
public class MemStoreSnapshot {
|
||||||
private final long id;
|
private final long id;
|
||||||
private final int cellsCount;
|
private final int cellsCount;
|
||||||
private final MemStoreSize memStoreSize;
|
private final MemStoreSize memStoreSize;
|
||||||
private final TimeRangeTracker timeRangeTracker;
|
private final TimeRangeTracker timeRangeTracker;
|
||||||
private final List<KeyValueScanner> scanners;
|
|
||||||
private final boolean tagsPresent;
|
private final boolean tagsPresent;
|
||||||
|
private final ImmutableSegment snapshotImmutableSegment;
|
||||||
|
|
||||||
public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
|
public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.cellsCount = snapshot.getCellsCount();
|
this.cellsCount = snapshot.getCellsCount();
|
||||||
this.memStoreSize = snapshot.getMemStoreSize();
|
this.memStoreSize = snapshot.getMemStoreSize();
|
||||||
this.timeRangeTracker = snapshot.getTimeRangeTracker();
|
this.timeRangeTracker = snapshot.getTimeRangeTracker();
|
||||||
this.scanners = snapshot.getSnapshotScanners();
|
|
||||||
this.tagsPresent = snapshot.isTagsPresent();
|
this.tagsPresent = snapshot.isTagsPresent();
|
||||||
|
this.snapshotImmutableSegment = snapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -74,10 +81,16 @@ public class MemStoreSnapshot implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return {@link KeyValueScanner} for iterating over the snapshot
|
* Create new {@link SnapshotSegmentScanner}s for iterating over the snapshot. <br/>
|
||||||
|
* NOTE:Here when create new {@link SnapshotSegmentScanner}s, {@link Segment#incScannerCount} is
|
||||||
|
* invoked in the {@link SnapshotSegmentScanner} ctor,so after we use these
|
||||||
|
* {@link SnapshotSegmentScanner}s, we must call {@link SnapshotSegmentScanner#close} to invoke
|
||||||
|
* {@link Segment#decScannerCount}.
|
||||||
|
* @return {@link KeyValueScanner}s(Which type is {@link SnapshotSegmentScanner}) for iterating
|
||||||
|
* over the snapshot.
|
||||||
*/
|
*/
|
||||||
public List<KeyValueScanner> getScanners() {
|
public List<KeyValueScanner> getScanners() {
|
||||||
return scanners;
|
return snapshotImmutableSegment.getSnapshotScanners();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -86,13 +99,4 @@ public class MemStoreSnapshot implements Closeable {
|
||||||
public boolean isTagsPresent() {
|
public boolean isTagsPresent() {
|
||||||
return this.tagsPresent;
|
return this.tagsPresent;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
if (this.scanners != null) {
|
|
||||||
for (KeyValueScanner scanner : scanners) {
|
|
||||||
scanner.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
@ -784,11 +785,12 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void flushStore(HStore store, long id) throws IOException {
|
private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
|
||||||
StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
|
StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
|
||||||
storeFlushCtx.prepare();
|
storeFlushCtx.prepare();
|
||||||
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
|
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
|
||||||
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
|
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
|
||||||
|
return storeFlushCtx;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2238,7 +2240,7 @@ public class TestHStore {
|
||||||
flushThread.join();
|
flushThread.join();
|
||||||
|
|
||||||
if (myDefaultMemStore.shouldWait) {
|
if (myDefaultMemStore.shouldWait) {
|
||||||
SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
|
SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
|
||||||
MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
|
MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
|
||||||
assertTrue(memStoreLAB.isClosed());
|
assertTrue(memStoreLAB.isClosed());
|
||||||
assertTrue(!memStoreLAB.chunks.isEmpty());
|
assertTrue(!memStoreLAB.chunks.isEmpty());
|
||||||
|
@ -2265,16 +2267,16 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
|
@SuppressWarnings("unchecked")
|
||||||
List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
|
private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
|
||||||
|
List<T> resultScanners = new ArrayList<T>();
|
||||||
for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
|
for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
|
||||||
if (keyValueScanner instanceof SegmentScanner) {
|
if (keyValueScannerClass.isInstance(keyValueScanner)) {
|
||||||
segmentScanners.add((SegmentScanner) keyValueScanner);
|
resultScanners.add((T) keyValueScanner);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
assertTrue(resultScanners.size() == 1);
|
||||||
assertTrue(segmentScanners.size() == 1);
|
return resultScanners.get(0);
|
||||||
return segmentScanners.get(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -2326,6 +2328,116 @@ public class TestHStore {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test is for HBASE-26488
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
|
||||||
|
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
||||||
|
byte[] smallValue = new byte[3];
|
||||||
|
byte[] largeValue = new byte[9];
|
||||||
|
final long timestamp = EnvironmentEdgeManager.currentTime();
|
||||||
|
final long seqId = 100;
|
||||||
|
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
|
||||||
|
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
|
||||||
|
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
quals.add(qf1);
|
||||||
|
quals.add(qf2);
|
||||||
|
|
||||||
|
conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
|
||||||
|
conf.setBoolean(WALFactory.WAL_ENABLED, false);
|
||||||
|
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
|
||||||
|
MyDefaultStoreFlusher.class.getName());
|
||||||
|
|
||||||
|
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
|
||||||
|
MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
|
||||||
|
assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
|
||||||
|
|
||||||
|
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||||
|
store.add(smallCell, memStoreSizing);
|
||||||
|
store.add(largeCell, memStoreSizing);
|
||||||
|
flushStore(store, id++);
|
||||||
|
|
||||||
|
MemStoreLABImpl memStoreLAB =
|
||||||
|
(MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
|
||||||
|
assertTrue(memStoreLAB.isClosed());
|
||||||
|
assertTrue(memStoreLAB.getOpenScannerCount() == 0);
|
||||||
|
assertTrue(memStoreLAB.isReclaimed());
|
||||||
|
assertTrue(memStoreLAB.chunks.isEmpty());
|
||||||
|
StoreScanner storeScanner = null;
|
||||||
|
try {
|
||||||
|
storeScanner =
|
||||||
|
(StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
|
||||||
|
assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
|
||||||
|
assertTrue(store.memstore.size().getCellsCount() == 0);
|
||||||
|
assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
|
||||||
|
assertTrue(storeScanner.currentScanners.size() == 1);
|
||||||
|
assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
|
||||||
|
|
||||||
|
List<Cell> results = new ArrayList<>();
|
||||||
|
storeScanner.next(results);
|
||||||
|
assertEquals(2, results.size());
|
||||||
|
CellUtil.equals(smallCell, results.get(0));
|
||||||
|
CellUtil.equals(largeCell, results.get(1));
|
||||||
|
} finally {
|
||||||
|
if (storeScanner != null) {
|
||||||
|
storeScanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static class MyDefaultMemStore1 extends DefaultMemStore {
|
||||||
|
|
||||||
|
private ImmutableSegment snapshotImmutableSegment;
|
||||||
|
|
||||||
|
public MyDefaultMemStore1(Configuration conf, CellComparator c,
|
||||||
|
RegionServicesForStores regionServices) {
|
||||||
|
super(conf, c, regionServices);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MemStoreSnapshot snapshot() {
|
||||||
|
MemStoreSnapshot result = super.snapshot();
|
||||||
|
this.snapshotImmutableSegment = snapshot;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
|
||||||
|
private static final AtomicInteger failCounter = new AtomicInteger(1);
|
||||||
|
private static final AtomicInteger counter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
public MyDefaultStoreFlusher(Configuration conf, HStore store) {
|
||||||
|
super(conf, store);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
|
||||||
|
MonitoredTask status, ThroughputController throughputController,
|
||||||
|
FlushLifeCycleTracker tracker) throws IOException {
|
||||||
|
counter.incrementAndGet();
|
||||||
|
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void performFlush(InternalScanner scanner, final CellSink sink,
|
||||||
|
ThroughputController throughputController) throws IOException {
|
||||||
|
|
||||||
|
final int currentCount = counter.get();
|
||||||
|
CellSink newCellSink = (cell) -> {
|
||||||
|
if (currentCount <= failCounter.get()) {
|
||||||
|
throw new IOException("Simulated exception by tests");
|
||||||
|
}
|
||||||
|
sink.append(cell);
|
||||||
|
};
|
||||||
|
super.performFlush(scanner, newCellSink, throughputController);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private HStoreFile mockStoreFileWithLength(long length) {
|
private HStoreFile mockStoreFileWithLength(long length) {
|
||||||
HStoreFile sf = mock(HStoreFile.class);
|
HStoreFile sf = mock(HStoreFile.class);
|
||||||
StoreFileReader sfr = mock(StoreFileReader.class);
|
StoreFileReader sfr = mock(StoreFileReader.class);
|
||||||
|
@ -3109,7 +3221,5 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue