HBASE-26026 HBase Write may be stuck forever when using CompactingMemStore (#3421)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
chenglei 2021-08-12 23:11:19 +08:00 committed by Duo Zhang
parent 2d89b08eb0
commit 8fced2d7f8
7 changed files with 202 additions and 13 deletions

View File

@ -70,7 +70,9 @@ public abstract class AbstractMemStore implements MemStore {
protected static void addToScanners(Segment segment, long readPt,
List<KeyValueScanner> scanners) {
scanners.add(segment.getScanner(readPt));
if (!segment.isEmpty()) {
scanners.add(segment.getScanner(readPt));
}
}
protected AbstractMemStore(final Configuration conf, final CellComparator c,
@ -156,7 +158,7 @@ public abstract class AbstractMemStore implements MemStore {
}
}
private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
boolean mslabUsed = (toAdd != cell);
// This cell data is backed by the same byte[] where we read request in RPC(See

View File

@ -453,9 +453,14 @@ public class CompactingMemStore extends AbstractMemStore {
inMemoryCompaction();
}
private void flushInMemory(MutableSegment currActive) {
protected void flushInMemory(MutableSegment currActive) {
LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
pushActiveToPipeline(currActive);
// NOTE: Due to concurrent writes and because we first add cell size to currActive.getDataSize
// and then actually add cell to currActive.cellSet, it is possible that
// currActive.getDataSize could not accommodate cellToAdd but currActive.cellSet is still
// empty if pending writes which not yet add cells to currActive.cellSet.
// so here we should not check currActive.isEmpty or not.
pushActiveToPipeline(currActive, false);
}
void inMemoryCompaction() {
@ -524,7 +529,23 @@ public class CompactingMemStore extends AbstractMemStore {
}
protected void pushActiveToPipeline(MutableSegment currActive) {
if (!currActive.isEmpty()) {
pushActiveToPipeline(currActive, true);
}
/**
* NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
* concurrent writes and because we first add cell size to currActive.getDataSize and then
* actually add cell to currActive.cellSet, it is possible that currActive.getDataSize could not
* accommodate cellToAdd but currActive.cellSet is still empty if pending writes which not yet add
* cells to currActive.cellSet,so for
* {@link CompactingMemStore#flushInMemory(MutableSegment)},checkEmpty parameter is false. But if
* {@link CompactingMemStore#snapshot} called this method,because there is no pending
* write,checkEmpty parameter could be true.
* @param currActive
* @param checkEmpty
*/
protected void pushActiveToPipeline(MutableSegment currActive, boolean checkEmpty) {
if (!checkEmpty || !currActive.isEmpty()) {
pipeline.pushHead(currActive);
resetActive();
}

View File

@ -223,10 +223,16 @@ public class CompactionPipeline {
LOG.warn("Segment flattening failed, because versions do not match");
return false;
}
int i = 0;
int i = -1;
for (ImmutableSegment s : pipeline) {
i++;
if ( s.canBeFlattened() ) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
if (s.isEmpty()) {
// after s.waitForUpdates() is called, there is no updates preceding,if no cells in s,
// we can skip it.
continue;
}
// size to be updated
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
@ -242,9 +248,7 @@ public class CompactionPipeline {
LOG.debug("Compaction pipeline segment {} flattened", s);
return true;
}
i++;
}
}
// do not update the global memstore size counter and do not increase the version,
// because all the cells remain in place

View File

@ -208,7 +208,8 @@ public class MemStoreCompactor {
MemStoreSegmentsIterator iterator = null;
List<ImmutableSegment> segments = versionedList.getStoreSegments();
for (ImmutableSegment s : segments) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed.
// we skip empty segment when create MemStoreSegmentsIterator following.
}
switch (action) {

View File

@ -176,11 +176,15 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
// use case 1: both kvs in kvset
this.memstore.add(kv1.clone(), null);
this.memstore.add(kv2.clone(), null);
verifyScanAcrossSnapshot2(kv1, kv2);
// snapshot is empty,active segment is not empty,
// empty segment is skipped.
verifyOneScanAcrossSnapshot2(kv1, kv2);
// use case 2: both kvs in snapshot
this.memstore.snapshot();
verifyScanAcrossSnapshot2(kv1, kv2);
// active segment is empty,snapshot is not empty,
// empty segment is skipped.
verifyOneScanAcrossSnapshot2(kv1, kv2);
// use case 3: first in snapshot second in kvset
this.memstore = new CompactingMemStore(HBaseConfiguration.create(),

View File

@ -253,11 +253,16 @@ public class TestDefaultMemStore {
// use case 1: both kvs in kvset
this.memstore.add(kv1.clone(), null);
this.memstore.add(kv2.clone(), null);
verifyScanAcrossSnapshot2(kv1, kv2);
// snapshot is empty,active segment is not empty,
// empty segment is skipped.
verifyOneScanAcrossSnapshot2(kv1, kv2);
// use case 2: both kvs in snapshot
// active segment is empty,snapshot is not empty,
// empty segment is skipped.
this.memstore.snapshot();
verifyScanAcrossSnapshot2(kv1, kv2);
//
verifyOneScanAcrossSnapshot2(kv1, kv2);
// use case 3: first in snapshot second in kvset
this.memstore = new DefaultMemStore();
@ -286,6 +291,18 @@ public class TestDefaultMemStore {
assertNull(scanner1.next());
}
protected void verifyOneScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException {
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
assertEquals(1, memstorescanners.size());
final KeyValueScanner scanner0 = memstorescanners.get(0);
scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
Cell n0 = scanner0.next();
Cell n1 = scanner0.next();
assertTrue(kv1.equals(n0));
assertTrue(kv2.equals(n1));
assertNull(scanner0.next());
}
protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
throws IOException {
scanner.seek(KeyValueUtil.createFirstOnRow(new byte[]{}));

View File

@ -43,6 +43,7 @@ import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@ -1729,6 +1730,67 @@ public class TestHStore {
assertArrayEquals(table, hFileContext.getTableName());
}
@Test
public void testCompactingMemStoreStuckBug26026() throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
myCompactingMemStore.smallCellPreUpdateCounter.set(0);
myCompactingMemStore.smallCellPostUpdateCounter.set(0);
myCompactingMemStore.largeCellPreUpdateCounter.set(0);
myCompactingMemStore.largeCellPostUpdateCounter.set(0);
Thread smallCellThread = new Thread(() -> {
store.add(smallCell, new NonThreadSafeMemStoreSizing());
});
smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
smallCellThread.start();
String oldThreadName = Thread.currentThread().getName();
try {
/**
* 1.smallCellThread enters CompactingMemStore.shouldFlushInMemory first, when largeCellThread
* enters CompactingMemStore.shouldFlushInMemory, CompactingMemStore.active.getDataSize could
* not accommodate cellToAdd and CompactingMemStore.shouldFlushInMemory return true.
* <p/>
* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
* can add cell to currentActive . That is to say when largeCellThread called flushInMemory
* method, CompactingMemStore.active has no cell.
*/
Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
store.add(largeCell, new NonThreadSafeMemStoreSizing());
smallCellThread.join();
for (int i = 0; i < 100; i++) {
long currentTimestamp = timestamp + 100 + i;
Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
store.add(cell, new NonThreadSafeMemStoreSizing());
}
} finally {
Thread.currentThread().setName(oldThreadName);
}
}
private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
@ -1928,4 +1990,82 @@ public class TestHStore {
@Override
public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
}
public static class MyCompactingMemStore2 extends CompactingMemStore {
private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
private final AtomicInteger largeCellPostUpdateCounter = new AtomicInteger(0);
private final AtomicInteger smallCellPostUpdateCounter = new AtomicInteger(0);
public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
HStore store, RegionServicesForStores regionServices,
MemoryCompactionPolicy compactionPolicy) throws IOException {
super(conf, cellComparator, store, regionServices, compactionPolicy);
}
protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
int currentCount = largeCellPreUpdateCounter.incrementAndGet();
if (currentCount <= 1) {
try {
/**
* smallCellThread enters super.shouldFlushInMemory first, when largeCellThread enters
* super.shouldFlushInMemory, currActive.getDataSize could not accommodate cellToAdd and
* super.shouldFlushInMemory return true.
*/
preCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
boolean returnValue = super.shouldFlushInMemory(currActive, cellToAdd, memstoreSizing);
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
try {
preCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
return returnValue;
}
@Override
protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
try {
/**
* After largeCellThread finished flushInMemory method, smallCellThread can add cell to
* currentActive . That is to say when largeCellThread called flushInMemory method,
* currentActive has no cell.
*/
postCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
super.doAdd(currentActive, cell, memstoreSizing);
}
@Override
protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
super.flushInMemory(currentActiveMutableSegment);
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
if (largeCellPreUpdateCounter.get() <= 1) {
try {
postCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
}
}