HBASE-26465 MemStoreLAB may be released early when its SegmentScanner is scanning (#3859)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Reviewed-by: Anoop Sam John <anoopsamjohn@apache.org>
This commit is contained in:
chenglei 2021-11-25 20:32:28 +08:00 committed by GitHub
parent 25fc701752
commit 9fd58fecf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 278 additions and 20 deletions

View File

@ -232,6 +232,8 @@ public abstract class AbstractMemStore implements MemStore {
}
/**
* This method is protected under {@link HStore#lock} write lock,<br/>
* and this method is used by {@link HStore#updateStorefiles} after flushing is completed.<br/>
* The passed snapshot was successfully persisted; it can be let go.
* @param id Id of the snapshot to clean out.
* @see MemStore#snapshot()
@ -245,6 +247,10 @@ public abstract class AbstractMemStore implements MemStore {
}
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
doClearSnapShot();
}
protected void doClearSnapShot() {
Segment oldSnapshot = this.snapshot;
if (!this.snapshot.isEmpty()) {
this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);

View File

@ -394,6 +394,9 @@ public class CompactingMemStore extends AbstractMemStore {
return Bytes.toString(getFamilyNameInBytes());
}
/**
* This method is protected under {@link HStore#lock} read lock.
*/
@Override
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
MutableSegment activeTmp = getActive();

View File

@ -132,16 +132,21 @@ public class DefaultMemStore extends AbstractMemStore {
}
@Override
/*
/**
* This method is protected under {@link HStore#lock} read lock. <br/>
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<KeyValueScanner> list = new ArrayList<>();
addToScanners(getActive(), readPt, list);
addToScanners(snapshot.getAllSegments(), readPt, list);
addToScanners(getSnapshotSegments(), readPt, list);
return list;
}
protected List<Segment> getSnapshotSegments() {
return snapshot.getAllSegments();
}
@Override
protected List<Segment> getSegments() throws IOException {
List<Segment> list = new ArrayList<>(2);

View File

@ -150,8 +150,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
protected final MemStore memstore;
/**
* TODO:After making the {@link DefaultMemStore} extensible in {@link HStore} by HBASE-26476,we
* change it back to final.
*/
protected MemStore memstore;
// This stores directory in the filesystem.
private final HRegion region;
protected Configuration conf;
@ -1222,6 +1225,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
/**
* NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
* close {@link DefaultMemStore#snapshot}, which may be used by
* {@link DefaultMemStore#getScanners}.
*/
if (snapshotId > 0) {
this.memstore.clearSnapshot(snapshotId);
}
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@ -1230,13 +1241,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// the lock.
this.lock.writeLock().unlock();
}
// We do not need to call clearSnapshot method inside the write lock.
// The clearSnapshot itself is thread safe, which can be called at the same time with other
// memstore operations expect snapshot and clearSnapshot. And for these two methods, in HRegion
// we can guarantee that there is only one onging flush, so they will be no race.
if (snapshotId > 0) {
this.memstore.clearSnapshot(snapshotId);
}
// notify to be called here - only in case of flushes
notifyChangedReadersObservers(sfs);
if (LOG.isTraceEnabled()) {

View File

@ -81,10 +81,10 @@ public class MemStoreLABImpl implements MemStoreLAB {
// This flag is for closing this instance, its set when clearing snapshot of
// memstore
private volatile boolean closed = false;
private final AtomicBoolean closed = new AtomicBoolean(false);;
// This flag is for reclaiming chunks. Its set when putting chunks back to
// pool
private AtomicBoolean reclaimed = new AtomicBoolean(false);
private final AtomicBoolean reclaimed = new AtomicBoolean(false);
// Current count of open scanners which reading data from this MemStoreLAB
private final AtomicInteger openScannerCount = new AtomicInteger();
@ -259,7 +259,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
*/
@Override
public void close() {
this.closed = true;
if (!this.closed.compareAndSet(false, true)) {
return;
}
// We could put back the chunks to pool for reusing only when there is no
// opening scanner which will read their data
int count = openScannerCount.get();
@ -286,7 +288,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
if (this.closed && count == 0) {
if (this.closed.get() && count == 0) {
recycleChunks();
}
}
@ -294,6 +296,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
private void recycleChunks() {
if (reclaimed.compareAndSet(false, true)) {
chunkCreator.putbackChunks(chunks);
chunks.clear();
}
}
@ -409,13 +412,21 @@ public class MemStoreLABImpl implements MemStoreLAB {
return pooledChunks;
}
Integer getNumOfChunksReturnedToPool() {
Integer getNumOfChunksReturnedToPool(Set<Integer> chunksId) {
int i = 0;
for (Integer id : this.chunks) {
for (Integer id : chunksId) {
if (chunkCreator.isChunkInPool(id)) {
i++;
}
}
return i;
}
boolean isReclaimed() {
return reclaimed.get();
}
boolean isClosed() {
return closed.get();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@ -2146,6 +2147,122 @@ public class TestHStore {
}
}
/**
* <pre>
* This test is for HBASE-26465,
* test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
* concurrently. The threads sequence before HBASE-26465 is:
* 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
* {@link DefaultMemStore}.
* 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
* {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
* 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
* {@link DefaultMemStore#getScanners},here the scan thread gets the
* {@link DefaultMemStore#snapshot} which is created by the flush thread.
* 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
* {@link DefaultMemStore#snapshot},because the reference count of the corresponding
* {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
* are recycled.
* 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
* {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
* reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
* corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
* be overwritten by other write threads,which may cause serious problem.
* After HBASE-26465,{@link DefaultMemStore#getScanners} and
* {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
* </pre>
*/
@Test
public void testClearSnapshotGetScannerConcurrently() throws Exception {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
conf.setBoolean(WALFactory.WAL_ENABLED, false);
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
MyDefaultMemStore myDefaultMemStore = new MyDefaultMemStore(store.conf, store.getComparator(),
store.getHRegion().getRegionServicesForStores());
store.memstore = myDefaultMemStore;
myDefaultMemStore.store = store;
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
store.add(smallCell, memStoreSizing);
store.add(largeCell, memStoreSizing);
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
final Thread flushThread = new Thread(() -> {
try {
flushStore(store, id++);
} catch (Throwable exception) {
exceptionRef.set(exception);
}
});
flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
flushThread.start();
String oldThreadName = Thread.currentThread().getName();
StoreScanner storeScanner = null;
try {
Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
/**
* Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
*/
myDefaultMemStore.getScannerCyclicBarrier.await();
storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
flushThread.join();
if (myDefaultMemStore.shouldWait) {
SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
assertTrue(memStoreLAB.isClosed());
assertTrue(!memStoreLAB.chunks.isEmpty());
assertTrue(!memStoreLAB.isReclaimed());
Cell cell1 = segmentScanner.next();
CellUtil.equals(smallCell, cell1);
Cell cell2 = segmentScanner.next();
CellUtil.equals(largeCell, cell2);
assertNull(segmentScanner.next());
} else {
List<Cell> results = new ArrayList<>();
storeScanner.next(results);
assertEquals(2, results.size());
CellUtil.equals(smallCell, results.get(0));
CellUtil.equals(largeCell, results.get(1));
}
assertTrue(exceptionRef.get() == null);
} finally {
if (storeScanner != null) {
storeScanner.close();
}
Thread.currentThread().setName(oldThreadName);
}
}
private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
if (keyValueScanner instanceof SegmentScanner) {
segmentScanners.add((SegmentScanner) keyValueScanner);
}
}
assertTrue(segmentScanners.size() == 1);
return segmentScanners.get(0);
}
@Test
public void testOnConfigurationChange() throws IOException {
final int COMMON_MAX_FILES_TO_COMPACT = 10;
@ -2847,4 +2964,112 @@ public class TestHStore {
}
}
}
public static class MyDefaultMemStore extends DefaultMemStore {
private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
private static final String FLUSH_THREAD_NAME = "flushMyThread";
/**
* Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
* could start.
*/
private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
/**
* Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
* completed, {@link DefaultMemStore#doClearSnapShot} could continue.
*/
private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
/**
* Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
* completed, {@link DefaultMemStore#getScanners} could continue.
*/
private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
private volatile boolean shouldWait = true;
private volatile HStore store = null;
public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
RegionServicesForStores regionServices)
throws IOException {
super(conf, cellComparator, regionServices);
}
@Override
protected List<Segment> getSnapshotSegments() {
List<Segment> result = super.getSnapshotSegments();
if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
if (currentCount == 1) {
if (this.shouldWait) {
try {
/**
* Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
* {@link DefaultMemStore#doClearSnapShot} could continue.
*/
preClearSnapShotCyclicBarrier.await();
/**
* Wait for {@link DefaultMemStore#doClearSnapShot} completed.
*/
postClearSnapShotCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
return result;
}
@Override
protected void doClearSnapShot() {
if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
int currentCount = clearSnapshotCounter.incrementAndGet();
if (currentCount == 1) {
try {
if (store.lock.isWriteLockedByCurrentThread()) {
shouldWait = false;
}
/**
* Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
* thread could start.
*/
getScannerCyclicBarrier.await();
if (shouldWait) {
/**
* Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
*/
preClearSnapShotCyclicBarrier.await();
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
super.doClearSnapShot();
if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
int currentCount = clearSnapshotCounter.get();
if (currentCount == 1) {
if (shouldWait) {
try {
/**
* Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
* {@link DefaultMemStore#getScanners} could continue.
*/
postClearSnapShotCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
}
}
}

View File

@ -24,9 +24,11 @@ import static org.junit.Assert.assertTrue;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
@ -256,11 +258,12 @@ public class TestMemStoreLAB {
// none of the chunkIds would have been returned back
assertTrue("All the chunks must have been cleared",
ChunkCreator.instance.numberOfMappedChunks() != 0);
Set<Integer> chunkIds = new HashSet<Integer>(mslab.chunks);
int pooledChunksNum = mslab.getPooledChunks().size();
// close the mslab
mslab.close();
// make sure all chunks where reclaimed back to pool
int queueLength = mslab.getNumOfChunksReturnedToPool();
int queueLength = mslab.getNumOfChunksReturnedToPool(chunkIds);
assertTrue("All chunks in chunk queue should be reclaimed or removed"
+ " after mslab closed but actually: " + (pooledChunksNum-queueLength),
pooledChunksNum-queueLength == 0);

View File

@ -162,8 +162,8 @@ public class TestStoreScannerClosure {
memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
if (memStoreLAB != null) {
// There should be no unpooled chunks
int openScannerCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
assertTrue("The memstore should not have unpooled chunks", openScannerCount == 0);
int refCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
assertTrue("The memstore should not have unpooled chunks", refCount == 0);
}
}
}