HBASE-26494 Using RefCnt to fix the flawed MemStoreLABImpl (#3983)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
96be585324
commit
1b8e577cc6
|
@ -68,7 +68,7 @@ public class CompactionPipeline {
|
|||
/**
|
||||
* <pre>
|
||||
* Version is volatile to ensure it is atomically read when not using a lock.
|
||||
* To indicate whether the suffix of pipleline changes:
|
||||
* To indicate whether the suffix of pipeline changes:
|
||||
* 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
|
||||
* added at Head, {@link #version} not change.
|
||||
* 2.for {@link CompactionPipeline#swap},{@link #version} increase.
|
||||
|
|
|
@ -17,10 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.nio.RefCnt;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -31,13 +33,16 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
public class ImmutableMemStoreLAB implements MemStoreLAB {
|
||||
|
||||
private final AtomicInteger openScannerCount = new AtomicInteger();
|
||||
private volatile boolean closed = false;
|
||||
private final RefCnt refCnt;
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
private final List<MemStoreLAB> mslabs;
|
||||
|
||||
public ImmutableMemStoreLAB(List<MemStoreLAB> mslabs) {
|
||||
this.mslabs = mslabs;
|
||||
this.refCnt = RefCnt.create(() -> {
|
||||
closeMSLABs();
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -85,47 +90,43 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
// 'openScannerCount' here tracks the scanners opened on segments which directly refer to this
|
||||
// MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of
|
||||
// 'refCnt' here tracks the scanners opened on segments which directly refer to this
|
||||
// MSLAB. The individual MSLABs this refers also having its own 'refCnt'. The usage of
|
||||
// the variable in close() and decScannerCount() is as as that in HeapMemstoreLAB. Here the
|
||||
// close just delegates the call to the individual MSLABs. The actual return of the chunks to
|
||||
// MSLABPool will happen within individual MSLABs only (which is at the leaf level).
|
||||
// Say an ImmutableMemStoreLAB is created over 2 HeapMemStoreLABs at some point and at that time
|
||||
// both of them were referred by ongoing scanners. So they have > 0 'openScannerCount'. Now over
|
||||
// the new Segment some scanners come in and this MSLABs 'openScannerCount' also goes up and
|
||||
// both of them were referred by ongoing scanners. So they have > 0 'refCnt'. Now over
|
||||
// the new Segment some scanners come in and this MSLABs 'refCnt' also goes up and
|
||||
// then come down on finish of scanners. Now a close() call comes to this Immutable MSLAB. As
|
||||
// it's 'openScannerCount' is zero it will call close() on both of the Heap MSLABs. Say by that
|
||||
// it's 'refCnt' is zero it will call close() on both of the Heap MSLABs. Say by that
|
||||
// time the old scanners on one of the MSLAB got over where as on the other, still an old
|
||||
// scanner is going on. The call close() on that MSLAB will not close it immediately but will
|
||||
// just mark it for closure as it's 'openScannerCount' still > 0. Later once the old scan is
|
||||
// just decrease it's 'refCnt' and it's 'refCnt' still > 0. Later once the old scan is
|
||||
// over, the decScannerCount() call will do the actual close and return of the chunks.
|
||||
this.closed = true;
|
||||
if (!this.closed.compareAndSet(false, true)) {
|
||||
return;
|
||||
}
|
||||
// When there are still on going scanners over this MSLAB, we will defer the close until all
|
||||
// scanners finish. We will just mark it for closure. See #decScannerCount(). This will be
|
||||
// called at end of every scan. When it is marked for closure and scanner count reached 0, we
|
||||
// will do the actual close then.
|
||||
checkAndCloseMSLABs(openScannerCount.get());
|
||||
// scanners finish. We will just decrease it's 'refCnt'. See #decScannerCount(). This will be
|
||||
// called at end of every scan. When it's 'refCnt' reached 0, we will do the actual close then.
|
||||
this.refCnt.release();
|
||||
}
|
||||
|
||||
private void checkAndCloseMSLABs(int openScanners) {
|
||||
if (openScanners == 0) {
|
||||
private void closeMSLABs() {
|
||||
for (MemStoreLAB mslab : this.mslabs) {
|
||||
mslab.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incScannerCount() {
|
||||
this.openScannerCount.incrementAndGet();
|
||||
this.refCnt.retain();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decScannerCount() {
|
||||
int count = this.openScannerCount.decrementAndGet();
|
||||
if (this.closed) {
|
||||
checkAndCloseMSLABs(count);
|
||||
}
|
||||
this.refCnt.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -138,5 +139,17 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
|
|||
return ChunkCreator.getInstance().isOffheap();
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
int getRefCntValue() {
|
||||
return this.refCnt.refCnt();
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
boolean isClosed() {
|
||||
return this.closed.get();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -18,13 +18,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ByteBufferExtendedCell;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.ExtendedCell;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.nio.RefCnt;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -85,8 +86,11 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
// This flag is for reclaiming chunks. Its set when putting chunks back to
|
||||
// pool
|
||||
private final AtomicBoolean reclaimed = new AtomicBoolean(false);
|
||||
// Current count of open scanners which reading data from this MemStoreLAB
|
||||
private final AtomicInteger openScannerCount = new AtomicInteger();
|
||||
/**
|
||||
* Its initial value is 1, so it is one bigger than the current count of open scanners which
|
||||
* reading data from this MemStoreLAB.
|
||||
*/
|
||||
private final RefCnt refCnt;
|
||||
|
||||
// Used in testing
|
||||
public MemStoreLABImpl() {
|
||||
|
@ -100,6 +104,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
|
||||
Preconditions.checkArgument(maxAlloc <= dataChunkSize,
|
||||
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
|
||||
this.refCnt = RefCnt.create(() -> {
|
||||
recycleChunks();
|
||||
});
|
||||
|
||||
// if user requested to work with MSLABs (whether on- or off-heap), then the
|
||||
// immutable segments are going to use CellChunkMap as their index
|
||||
|
@ -264,14 +271,13 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
}
|
||||
// We could put back the chunks to pool for reusing only when there is no
|
||||
// opening scanner which will read their data
|
||||
int count = openScannerCount.get();
|
||||
if(count == 0) {
|
||||
recycleChunks();
|
||||
}
|
||||
this.refCnt.release();
|
||||
}
|
||||
|
||||
int getOpenScannerCount() {
|
||||
return this.openScannerCount.get();
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
int getRefCntValue() {
|
||||
return this.refCnt.refCnt();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -279,7 +285,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
*/
|
||||
@Override
|
||||
public void incScannerCount() {
|
||||
this.openScannerCount.incrementAndGet();
|
||||
this.refCnt.retain();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -287,10 +293,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
*/
|
||||
@Override
|
||||
public void decScannerCount() {
|
||||
int count = this.openScannerCount.decrementAndGet();
|
||||
if (this.closed.get() && count == 0) {
|
||||
recycleChunks();
|
||||
}
|
||||
this.refCnt.release();
|
||||
}
|
||||
|
||||
private void recycleChunks() {
|
||||
|
|
|
@ -2351,7 +2351,7 @@ public class TestHStore {
|
|||
MemStoreLABImpl memStoreLAB =
|
||||
(MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
|
||||
assertTrue(memStoreLAB.isClosed());
|
||||
assertTrue(memStoreLAB.getOpenScannerCount() == 0);
|
||||
assertTrue(memStoreLAB.getRefCntValue() == 0);
|
||||
assertTrue(memStoreLAB.isReclaimed());
|
||||
assertTrue(memStoreLAB.chunks.isEmpty());
|
||||
StoreScanner storeScanner = null;
|
||||
|
@ -2426,6 +2426,103 @@ public class TestHStore {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
|
||||
*/
|
||||
@Test
|
||||
public void testImmutableMemStoreLABRefCnt() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
||||
byte[] smallValue = new byte[3];
|
||||
byte[] largeValue = new byte[9];
|
||||
final long timestamp = EnvironmentEdgeManager.currentTime();
|
||||
final long seqId = 100;
|
||||
final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
|
||||
final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
|
||||
final Cell smallCell2 = createCell(qf3, timestamp, seqId+1, smallValue);
|
||||
final Cell largeCell2 = createCell(qf4, timestamp, seqId+1, largeValue);
|
||||
final Cell smallCell3 = createCell(qf5, timestamp, seqId+2, smallValue);
|
||||
final Cell largeCell3 = createCell(qf6, timestamp, seqId+2, largeValue);
|
||||
|
||||
int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
|
||||
int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
|
||||
int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
|
||||
int flushByteSize = firstWriteCellByteSize - 2;
|
||||
|
||||
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
|
||||
conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
|
||||
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
|
||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
|
||||
conf.setBoolean(WALFactory.WAL_ENABLED, false);
|
||||
|
||||
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
||||
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
||||
|
||||
final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
|
||||
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
|
||||
myCompactingMemStore.allowCompaction.set(false);
|
||||
|
||||
NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
store.add(smallCell1, memStoreSizing);
|
||||
store.add(largeCell1, memStoreSizing);
|
||||
store.add(smallCell2, memStoreSizing);
|
||||
store.add(largeCell2, memStoreSizing);
|
||||
store.add(smallCell3, memStoreSizing);
|
||||
store.add(largeCell3, memStoreSizing);
|
||||
VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
|
||||
assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
|
||||
List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
|
||||
List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
|
||||
for (ImmutableSegment segment : segments) {
|
||||
memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
|
||||
}
|
||||
List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
|
||||
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
|
||||
assertTrue(memStoreLAB.getRefCntValue() == 2);
|
||||
}
|
||||
|
||||
myCompactingMemStore.allowCompaction.set(true);
|
||||
myCompactingMemStore.flushInMemory();
|
||||
|
||||
versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
|
||||
assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
|
||||
ImmutableMemStoreLAB immutableMemStoreLAB =
|
||||
(ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
|
||||
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
|
||||
assertTrue(memStoreLAB.getRefCntValue() == 2);
|
||||
}
|
||||
|
||||
List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
|
||||
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
|
||||
assertTrue(memStoreLAB.getRefCntValue() == 2);
|
||||
}
|
||||
assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
|
||||
for (KeyValueScanner scanner : scanners1) {
|
||||
scanner.close();
|
||||
}
|
||||
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
|
||||
assertTrue(memStoreLAB.getRefCntValue() == 1);
|
||||
}
|
||||
for (KeyValueScanner scanner : scanners2) {
|
||||
scanner.close();
|
||||
}
|
||||
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
|
||||
assertTrue(memStoreLAB.getRefCntValue() == 1);
|
||||
}
|
||||
assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
|
||||
flushStore(store, id++);
|
||||
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
|
||||
assertTrue(memStoreLAB.getRefCntValue() == 0);
|
||||
}
|
||||
assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
|
||||
assertTrue(immutableMemStoreLAB.isClosed());
|
||||
for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
|
||||
assertTrue(memStoreLAB.isClosed());
|
||||
assertTrue(memStoreLAB.isReclaimed());
|
||||
assertTrue(memStoreLAB.chunks.isEmpty());
|
||||
}
|
||||
}
|
||||
|
||||
private HStoreFile mockStoreFileWithLength(long length) {
|
||||
HStoreFile sf = mock(HStoreFile.class);
|
||||
StoreFileReader sfr = mock(StoreFileReader.class);
|
||||
|
|
|
@ -160,7 +160,7 @@ public class TestStoreScannerClosure {
|
|||
memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
|
||||
if (memStoreLAB != null) {
|
||||
// There should be no unpooled chunks
|
||||
int refCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
|
||||
int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue();
|
||||
assertTrue("The memstore should not have unpooled chunks", refCount == 0);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue