HBASE-26494 Using RefCnt to fix the flawed MemStoreLABImpl (#3983)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
chenglei 2022-01-01 20:50:52 +08:00 committed by Duo Zhang
parent ede518c295
commit efeec919de
5 changed files with 154 additions and 41 deletions

View File

@ -68,7 +68,7 @@ public class CompactionPipeline {
/**
* <pre>
* Version is volatile to ensure it is atomically read when not using a lock.
* To indicate whether the suffix of pipleline changes
* To indicate whether the suffix of pipeline changes:
* 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
* added at Head, {@link #version} not change.
* 2.for {@link CompactionPipeline#swap},{@link #version} increase.

View File

@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.errorprone.annotations.RestrictedApi;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -31,13 +33,16 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class ImmutableMemStoreLAB implements MemStoreLAB {
private final AtomicInteger openScannerCount = new AtomicInteger();
private volatile boolean closed = false;
private final RefCnt refCnt;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final List<MemStoreLAB> mslabs;
public ImmutableMemStoreLAB(List<MemStoreLAB> mslabs) {
this.mslabs = mslabs;
this.refCnt = RefCnt.create(() -> {
closeMSLABs();
});
}
@Override
@ -85,47 +90,43 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
@Override
public void close() {
// 'openScannerCount' here tracks the scanners opened on segments which directly refer to this
// MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of
// 'refCnt' here tracks the scanners opened on segments which directly refer to this
// MSLAB. The individual MSLABs this refers also having its own 'refCnt'. The usage of
// the variable in close() and decScannerCount() is as as that in HeapMemstoreLAB. Here the
// close just delegates the call to the individual MSLABs. The actual return of the chunks to
// MSLABPool will happen within individual MSLABs only (which is at the leaf level).
// Say an ImmutableMemStoreLAB is created over 2 HeapMemStoreLABs at some point and at that time
// both of them were referred by ongoing scanners. So they have > 0 'openScannerCount'. Now over
// the new Segment some scanners come in and this MSLABs 'openScannerCount' also goes up and
// both of them were referred by ongoing scanners. So they have > 0 'refCnt'. Now over
// the new Segment some scanners come in and this MSLABs 'refCnt' also goes up and
// then come down on finish of scanners. Now a close() call comes to this Immutable MSLAB. As
// it's 'openScannerCount' is zero it will call close() on both of the Heap MSLABs. Say by that
// it's 'refCnt' is zero it will call close() on both of the Heap MSLABs. Say by that
// time the old scanners on one of the MSLAB got over where as on the other, still an old
// scanner is going on. The call close() on that MSLAB will not close it immediately but will
// just mark it for closure as it's 'openScannerCount' still > 0. Later once the old scan is
// just decrease it's 'refCnt' and it's 'refCnt' still > 0. Later once the old scan is
// over, the decScannerCount() call will do the actual close and return of the chunks.
this.closed = true;
if (!this.closed.compareAndSet(false, true)) {
return;
}
// When there are still on going scanners over this MSLAB, we will defer the close until all
// scanners finish. We will just mark it for closure. See #decScannerCount(). This will be
// called at end of every scan. When it is marked for closure and scanner count reached 0, we
// will do the actual close then.
checkAndCloseMSLABs(openScannerCount.get());
// scanners finish. We will just decrease it's 'refCnt'. See #decScannerCount(). This will be
// called at end of every scan. When it's 'refCnt' reached 0, we will do the actual close then.
this.refCnt.release();
}
private void checkAndCloseMSLABs(int openScanners) {
if (openScanners == 0) {
for (MemStoreLAB mslab : this.mslabs) {
mslab.close();
}
private void closeMSLABs() {
for (MemStoreLAB mslab : this.mslabs) {
mslab.close();
}
}
@Override
public void incScannerCount() {
this.openScannerCount.incrementAndGet();
this.refCnt.retain();
}
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
if (this.closed) {
checkAndCloseMSLABs(count);
}
this.refCnt.release();
}
@Override
@ -138,5 +139,17 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
return ChunkCreator.getInstance().isOffheap();
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
int getRefCntValue() {
return this.refCnt.refCnt();
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
boolean isClosed() {
return this.closed.get();
}
}

View File

@ -18,13 +18,13 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.errorprone.annotations.RestrictedApi;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -85,8 +86,11 @@ public class MemStoreLABImpl implements MemStoreLAB {
// This flag is for reclaiming chunks. Its set when putting chunks back to
// pool
private final AtomicBoolean reclaimed = new AtomicBoolean(false);
// Current count of open scanners which reading data from this MemStoreLAB
private final AtomicInteger openScannerCount = new AtomicInteger();
/**
* Its initial value is 1, so it is one bigger than the current count of open scanners which
* reading data from this MemStoreLAB.
*/
private final RefCnt refCnt;
// Used in testing
public MemStoreLABImpl() {
@ -100,6 +104,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
Preconditions.checkArgument(maxAlloc <= dataChunkSize,
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
this.refCnt = RefCnt.create(() -> {
recycleChunks();
});
// if user requested to work with MSLABs (whether on- or off-heap), then the
// immutable segments are going to use CellChunkMap as their index
@ -264,14 +271,13 @@ public class MemStoreLABImpl implements MemStoreLAB {
}
// We could put back the chunks to pool for reusing only when there is no
// opening scanner which will read their data
int count = openScannerCount.get();
if(count == 0) {
recycleChunks();
}
this.refCnt.release();
}
int getOpenScannerCount() {
return this.openScannerCount.get();
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
int getRefCntValue() {
return this.refCnt.refCnt();
}
/**
@ -279,7 +285,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
*/
@Override
public void incScannerCount() {
this.openScannerCount.incrementAndGet();
this.refCnt.retain();
}
/**
@ -287,10 +293,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
*/
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
if (this.closed.get() && count == 0) {
recycleChunks();
}
this.refCnt.release();
}
private void recycleChunks() {

View File

@ -2361,7 +2361,7 @@ public class TestHStore {
MemStoreLABImpl memStoreLAB =
(MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
assertTrue(memStoreLAB.isClosed());
assertTrue(memStoreLAB.getOpenScannerCount() == 0);
assertTrue(memStoreLAB.getRefCntValue() == 0);
assertTrue(memStoreLAB.isReclaimed());
assertTrue(memStoreLAB.chunks.isEmpty());
StoreScanner storeScanner = null;
@ -2436,6 +2436,103 @@ public class TestHStore {
}
}
/**
* This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
*/
@Test
public void testImmutableMemStoreLABRefCnt() throws Exception {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
final Cell smallCell2 = createCell(qf3, timestamp, seqId+1, smallValue);
final Cell largeCell2 = createCell(qf4, timestamp, seqId+1, largeValue);
final Cell smallCell3 = createCell(qf5, timestamp, seqId+2, smallValue);
final Cell largeCell3 = createCell(qf6, timestamp, seqId+2, largeValue);
int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
int flushByteSize = firstWriteCellByteSize - 2;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
conf.setBoolean(WALFactory.WAL_ENABLED, false);
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
myCompactingMemStore.allowCompaction.set(false);
NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
store.add(smallCell1, memStoreSizing);
store.add(largeCell1, memStoreSizing);
store.add(smallCell2, memStoreSizing);
store.add(largeCell2, memStoreSizing);
store.add(smallCell3, memStoreSizing);
store.add(largeCell3, memStoreSizing);
VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
for (ImmutableSegment segment : segments) {
memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
}
List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 2);
}
myCompactingMemStore.allowCompaction.set(true);
myCompactingMemStore.flushInMemory();
versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
ImmutableMemStoreLAB immutableMemStoreLAB =
(ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 2);
}
List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 2);
}
assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
for (KeyValueScanner scanner : scanners1) {
scanner.close();
}
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 1);
}
for (KeyValueScanner scanner : scanners2) {
scanner.close();
}
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 1);
}
assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
flushStore(store, id++);
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.getRefCntValue() == 0);
}
assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
assertTrue(immutableMemStoreLAB.isClosed());
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
assertTrue(memStoreLAB.isClosed());
assertTrue(memStoreLAB.isReclaimed());
assertTrue(memStoreLAB.chunks.isEmpty());
}
}
private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);

View File

@ -160,7 +160,7 @@ public class TestStoreScannerClosure {
memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
if (memStoreLAB != null) {
// There should be no unpooled chunks
int refCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue();
assertTrue("The memstore should not have unpooled chunks", refCount == 0);
}
}