HBASE-26026 HBase Write may be stuck forever when using CompactingMemStore (#3421)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
238c9b40bf
commit
11222fc4df
|
@ -70,7 +70,9 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
|
||||
protected static void addToScanners(Segment segment, long readPt,
|
||||
List<KeyValueScanner> scanners) {
|
||||
scanners.add(segment.getScanner(readPt));
|
||||
if (!segment.isEmpty()) {
|
||||
scanners.add(segment.getScanner(readPt));
|
||||
}
|
||||
}
|
||||
|
||||
protected AbstractMemStore(final Configuration conf, final CellComparator c,
|
||||
|
@ -156,7 +158,7 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
}
|
||||
}
|
||||
|
||||
private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
|
||||
protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
|
||||
Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
|
||||
boolean mslabUsed = (toAdd != cell);
|
||||
// This cell data is backed by the same byte[] where we read request in RPC(See
|
||||
|
|
|
@ -453,9 +453,14 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
inMemoryCompaction();
|
||||
}
|
||||
|
||||
private void flushInMemory(MutableSegment currActive) {
|
||||
protected void flushInMemory(MutableSegment currActive) {
|
||||
LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
|
||||
pushActiveToPipeline(currActive);
|
||||
// NOTE: Due to concurrent writes and because we first add cell size to currActive.getDataSize
|
||||
// and then actually add cell to currActive.cellSet, it is possible that
|
||||
// currActive.getDataSize could not accommodate cellToAdd but currActive.cellSet is still
|
||||
// empty if pending writes which not yet add cells to currActive.cellSet.
|
||||
// so here we should not check currActive.isEmpty or not.
|
||||
pushActiveToPipeline(currActive, false);
|
||||
}
|
||||
|
||||
void inMemoryCompaction() {
|
||||
|
@ -524,7 +529,23 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
}
|
||||
|
||||
protected void pushActiveToPipeline(MutableSegment currActive) {
|
||||
if (!currActive.isEmpty()) {
|
||||
pushActiveToPipeline(currActive, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
|
||||
* concurrent writes and because we first add cell size to currActive.getDataSize and then
|
||||
* actually add cell to currActive.cellSet, it is possible that currActive.getDataSize could not
|
||||
* accommodate cellToAdd but currActive.cellSet is still empty if pending writes which not yet add
|
||||
* cells to currActive.cellSet,so for
|
||||
* {@link CompactingMemStore#flushInMemory(MutableSegment)},checkEmpty parameter is false. But if
|
||||
* {@link CompactingMemStore#snapshot} called this method,because there is no pending
|
||||
* write,checkEmpty parameter could be true.
|
||||
* @param currActive
|
||||
* @param checkEmpty
|
||||
*/
|
||||
protected void pushActiveToPipeline(MutableSegment currActive, boolean checkEmpty) {
|
||||
if (!checkEmpty || !currActive.isEmpty()) {
|
||||
pipeline.pushHead(currActive);
|
||||
resetActive();
|
||||
}
|
||||
|
|
|
@ -223,10 +223,16 @@ public class CompactionPipeline {
|
|||
LOG.warn("Segment flattening failed, because versions do not match");
|
||||
return false;
|
||||
}
|
||||
int i = 0;
|
||||
int i = -1;
|
||||
for (ImmutableSegment s : pipeline) {
|
||||
i++;
|
||||
if ( s.canBeFlattened() ) {
|
||||
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
|
||||
if (s.isEmpty()) {
|
||||
// after s.waitForUpdates() is called, there is no updates preceding,if no cells in s,
|
||||
// we can skip it.
|
||||
continue;
|
||||
}
|
||||
// size to be updated
|
||||
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
|
||||
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
|
||||
|
@ -242,9 +248,7 @@ public class CompactionPipeline {
|
|||
LOG.debug("Compaction pipeline segment {} flattened", s);
|
||||
return true;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
}
|
||||
// do not update the global memstore size counter and do not increase the version,
|
||||
// because all the cells remain in place
|
||||
|
|
|
@ -208,7 +208,8 @@ public class MemStoreCompactor {
|
|||
MemStoreSegmentsIterator iterator = null;
|
||||
List<ImmutableSegment> segments = versionedList.getStoreSegments();
|
||||
for (ImmutableSegment s : segments) {
|
||||
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
|
||||
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed.
|
||||
// we skip empty segment when create MemStoreSegmentsIterator following.
|
||||
}
|
||||
|
||||
switch (action) {
|
||||
|
|
|
@ -178,11 +178,15 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
// use case 1: both kvs in kvset
|
||||
this.memstore.add(kv1.clone(), null);
|
||||
this.memstore.add(kv2.clone(), null);
|
||||
verifyScanAcrossSnapshot2(kv1, kv2);
|
||||
// snapshot is empty,active segment is not empty,
|
||||
// empty segment is skipped.
|
||||
verifyOneScanAcrossSnapshot2(kv1, kv2);
|
||||
|
||||
// use case 2: both kvs in snapshot
|
||||
this.memstore.snapshot();
|
||||
verifyScanAcrossSnapshot2(kv1, kv2);
|
||||
// active segment is empty,snapshot is not empty,
|
||||
// empty segment is skipped.
|
||||
verifyOneScanAcrossSnapshot2(kv1, kv2);
|
||||
|
||||
// use case 3: first in snapshot second in kvset
|
||||
this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
|
||||
|
|
|
@ -256,11 +256,16 @@ public class TestDefaultMemStore {
|
|||
// use case 1: both kvs in kvset
|
||||
this.memstore.add(kv1.clone(), null);
|
||||
this.memstore.add(kv2.clone(), null);
|
||||
verifyScanAcrossSnapshot2(kv1, kv2);
|
||||
// snapshot is empty,active segment is not empty,
|
||||
// empty segment is skipped.
|
||||
verifyOneScanAcrossSnapshot2(kv1, kv2);
|
||||
|
||||
// use case 2: both kvs in snapshot
|
||||
// active segment is empty,snapshot is not empty,
|
||||
// empty segment is skipped.
|
||||
this.memstore.snapshot();
|
||||
verifyScanAcrossSnapshot2(kv1, kv2);
|
||||
//
|
||||
verifyOneScanAcrossSnapshot2(kv1, kv2);
|
||||
|
||||
// use case 3: first in snapshot second in kvset
|
||||
this.memstore = new DefaultMemStore();
|
||||
|
@ -289,6 +294,18 @@ public class TestDefaultMemStore {
|
|||
assertNull(scanner1.next());
|
||||
}
|
||||
|
||||
protected void verifyOneScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException {
|
||||
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
|
||||
assertEquals(1, memstorescanners.size());
|
||||
final KeyValueScanner scanner0 = memstorescanners.get(0);
|
||||
scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
|
||||
Cell n0 = scanner0.next();
|
||||
Cell n1 = scanner0.next();
|
||||
assertTrue(kv1.equals(n0));
|
||||
assertTrue(kv2.equals(n1));
|
||||
assertNull(scanner0.next());
|
||||
}
|
||||
|
||||
protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
|
||||
throws IOException {
|
||||
scanner.seek(KeyValueUtil.createFirstOnRow(new byte[]{}));
|
||||
|
|
|
@ -43,6 +43,7 @@ import java.util.NavigableSet;
|
|||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -1712,6 +1713,67 @@ public class TestHStore {
|
|||
assertArrayEquals(table, hFileContext.getTableName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactingMemStoreStuckBug26026() throws IOException, InterruptedException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
||||
byte[] smallValue = new byte[3];
|
||||
byte[] largeValue = new byte[9];
|
||||
final long timestamp = EnvironmentEdgeManager.currentTime();
|
||||
final long seqId = 100;
|
||||
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
|
||||
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
|
||||
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
|
||||
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
|
||||
int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
|
||||
|
||||
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
|
||||
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
|
||||
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
|
||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
|
||||
|
||||
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
||||
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
||||
|
||||
MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
|
||||
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
|
||||
myCompactingMemStore.smallCellPreUpdateCounter.set(0);
|
||||
myCompactingMemStore.smallCellPostUpdateCounter.set(0);
|
||||
myCompactingMemStore.largeCellPreUpdateCounter.set(0);
|
||||
myCompactingMemStore.largeCellPostUpdateCounter.set(0);
|
||||
|
||||
Thread smallCellThread = new Thread(() -> {
|
||||
store.add(smallCell, new NonThreadSafeMemStoreSizing());
|
||||
});
|
||||
smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
|
||||
smallCellThread.start();
|
||||
|
||||
String oldThreadName = Thread.currentThread().getName();
|
||||
try {
|
||||
/**
|
||||
* 1.smallCellThread enters CompactingMemStore.shouldFlushInMemory first, when largeCellThread
|
||||
* enters CompactingMemStore.shouldFlushInMemory, CompactingMemStore.active.getDataSize could
|
||||
* not accommodate cellToAdd and CompactingMemStore.shouldFlushInMemory return true.
|
||||
* <p/>
|
||||
* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
|
||||
* can add cell to currentActive . That is to say when largeCellThread called flushInMemory
|
||||
* method, CompactingMemStore.active has no cell.
|
||||
*/
|
||||
Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
|
||||
store.add(largeCell, new NonThreadSafeMemStoreSizing());
|
||||
smallCellThread.join();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
long currentTimestamp = timestamp + 100 + i;
|
||||
Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
|
||||
store.add(cell, new NonThreadSafeMemStoreSizing());
|
||||
}
|
||||
} finally {
|
||||
Thread.currentThread().setName(oldThreadName);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private HStoreFile mockStoreFileWithLength(long length) {
|
||||
HStoreFile sf = mock(HStoreFile.class);
|
||||
StoreFileReader sfr = mock(StoreFileReader.class);
|
||||
|
@ -1911,4 +1973,82 @@ public class TestHStore {
|
|||
@Override
|
||||
public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
|
||||
}
|
||||
|
||||
public static class MyCompactingMemStore2 extends CompactingMemStore {
|
||||
private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
|
||||
private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
|
||||
private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
|
||||
private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
|
||||
private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger largeCellPostUpdateCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger smallCellPostUpdateCounter = new AtomicInteger(0);
|
||||
|
||||
public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
|
||||
HStore store, RegionServicesForStores regionServices,
|
||||
MemoryCompactionPolicy compactionPolicy) throws IOException {
|
||||
super(conf, cellComparator, store, regionServices, compactionPolicy);
|
||||
}
|
||||
|
||||
protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
|
||||
MemStoreSizing memstoreSizing) {
|
||||
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
|
||||
int currentCount = largeCellPreUpdateCounter.incrementAndGet();
|
||||
if (currentCount <= 1) {
|
||||
try {
|
||||
/**
|
||||
* smallCellThread enters super.shouldFlushInMemory first, when largeCellThread enters
|
||||
* super.shouldFlushInMemory, currActive.getDataSize could not accommodate cellToAdd and
|
||||
* super.shouldFlushInMemory return true.
|
||||
*/
|
||||
preCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean returnValue = super.shouldFlushInMemory(currActive, cellToAdd, memstoreSizing);
|
||||
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
|
||||
try {
|
||||
preCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return returnValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
|
||||
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
|
||||
try {
|
||||
/**
|
||||
* After largeCellThread finished flushInMemory method, smallCellThread can add cell to
|
||||
* currentActive . That is to say when largeCellThread called flushInMemory method,
|
||||
* currentActive has no cell.
|
||||
*/
|
||||
postCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
super.doAdd(currentActive, cell, memstoreSizing);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
|
||||
super.flushInMemory(currentActiveMutableSegment);
|
||||
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
|
||||
if (largeCellPreUpdateCounter.get() <= 1) {
|
||||
try {
|
||||
postCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue