HBASE-26210 HBase Write should be doomed to hang when cell size exceeds InmemoryFlushSize for CompactingMemStore (#3604)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
51233c3c9b
commit
dbf43dcfbd
|
@ -208,7 +208,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||||
stopCompaction();
|
stopCompaction();
|
||||||
// region level lock ensures pushing active to pipeline is done in isolation
|
// region level lock ensures pushing active to pipeline is done in isolation
|
||||||
// no concurrent update operations trying to flush the active segment
|
// no concurrent update operations trying to flush the active segment
|
||||||
pushActiveToPipeline(getActive());
|
pushActiveToPipeline(getActive(), true);
|
||||||
resetTimeOfOldestEdit();
|
resetTimeOfOldestEdit();
|
||||||
snapshotId = EnvironmentEdgeManager.currentTime();
|
snapshotId = EnvironmentEdgeManager.currentTime();
|
||||||
// in both cases whatever is pushed to snapshot is cleared from the pipeline
|
// in both cases whatever is pushed to snapshot is cleared from the pipeline
|
||||||
|
@ -413,33 +413,61 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether anything need to be done based on the current active set size.
|
* Check whether anything need to be done based on the current active set size. The method is
|
||||||
* The method is invoked upon every addition to the active set.
|
* invoked upon every addition to the active set. For CompactingMemStore, flush the active set to
|
||||||
* For CompactingMemStore, flush the active set to the read-only memory if it's
|
* the read-only memory if it's size is above threshold
|
||||||
* size is above threshold
|
|
||||||
* @param currActive intended segment to update
|
* @param currActive intended segment to update
|
||||||
* @param cellToAdd cell to be added to the segment
|
* @param cellToAdd cell to be added to the segment
|
||||||
* @param memstoreSizing object to accumulate changed size
|
* @param memstoreSizing object to accumulate changed size
|
||||||
* @return true if the cell can be added to the
|
* @return true if the cell can be added to the currActive
|
||||||
*/
|
*/
|
||||||
private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
|
protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
|
||||||
MemStoreSizing memstoreSizing) {
|
MemStoreSizing memstoreSizing) {
|
||||||
if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
|
long cellSize = MutableSegment.getCellLength(cellToAdd);
|
||||||
|
boolean successAdd = false;
|
||||||
|
while (true) {
|
||||||
|
long segmentDataSize = currActive.getDataSize();
|
||||||
|
if (!inWalReplay && segmentDataSize > inmemoryFlushSize) {
|
||||||
|
// when replaying edits from WAL there is no need in in-memory flush regardless the size
|
||||||
|
// otherwise size below flush threshold try to update atomically
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
|
||||||
|
if (memstoreSizing != null) {
|
||||||
|
memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
|
||||||
|
}
|
||||||
|
successAdd = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) {
|
||||||
|
// size above flush threshold so we flush in memory
|
||||||
|
this.tryFlushInMemoryAndCompactingAsync(currActive);
|
||||||
|
}
|
||||||
|
return successAdd;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to flush the currActive in memory and submit the background
|
||||||
|
* {@link InMemoryCompactionRunnable} to
|
||||||
|
* {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual
|
||||||
|
* flushing in memory.
|
||||||
|
* @param currActive current Active Segment to be flush in memory.
|
||||||
|
*/
|
||||||
|
private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) {
|
||||||
if (currActive.setInMemoryFlushed()) {
|
if (currActive.setInMemoryFlushed()) {
|
||||||
flushInMemory(currActive);
|
flushInMemory(currActive);
|
||||||
if (setInMemoryCompactionFlag()) {
|
if (setInMemoryCompactionFlag()) {
|
||||||
// The thread is dispatched to do in-memory compaction in the background
|
// The thread is dispatched to do in-memory compaction in the background
|
||||||
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
|
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Dispatching the MemStore in-memory flush for store " + store
|
LOG.trace(
|
||||||
.getColumnFamilyName());
|
"Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
|
||||||
}
|
}
|
||||||
getPool().execute(runnable);
|
getPool().execute(runnable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// externally visible only for tests
|
// externally visible only for tests
|
||||||
|
@ -497,26 +525,6 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||||
return getRegionServices().getInMemoryCompactionPool();
|
return getRegionServices().getInMemoryCompactionPool();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
|
|
||||||
MemStoreSizing memstoreSizing) {
|
|
||||||
long cellSize = MutableSegment.getCellLength(cellToAdd);
|
|
||||||
long segmentDataSize = currActive.getDataSize();
|
|
||||||
while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
|
|
||||||
// when replaying edits from WAL there is no need in in-memory flush regardless the size
|
|
||||||
// otherwise size below flush threshold try to update atomically
|
|
||||||
if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
|
|
||||||
if (memstoreSizing != null) {
|
|
||||||
memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
|
|
||||||
}
|
|
||||||
// enough space for cell - no need to flush
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
segmentDataSize = currActive.getDataSize();
|
|
||||||
}
|
|
||||||
// size above flush threshold
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The request to cancel the compaction asynchronous task (caused by in-memory flush)
|
* The request to cancel the compaction asynchronous task (caused by in-memory flush)
|
||||||
* The compaction may still happen if the request was sent too late
|
* The compaction may still happen if the request was sent too late
|
||||||
|
@ -528,10 +536,6 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void pushActiveToPipeline(MutableSegment currActive) {
|
|
||||||
pushActiveToPipeline(currActive, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
|
* NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
|
||||||
* concurrent writes and because we first add cell size to currActive.getDataSize and then
|
* concurrent writes and because we first add cell size to currActive.getDataSize and then
|
||||||
|
|
|
@ -229,7 +229,7 @@ public class CompactionPipeline {
|
||||||
if ( s.canBeFlattened() ) {
|
if ( s.canBeFlattened() ) {
|
||||||
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
|
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
|
||||||
if (s.isEmpty()) {
|
if (s.isEmpty()) {
|
||||||
// after s.waitForUpdates() is called, there is no updates preceding,if no cells in s,
|
// after s.waitForUpdates() is called, there is no updates pending,if no cells in s,
|
||||||
// we can skip it.
|
// we can skip it.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -823,11 +823,11 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
||||||
|
|
||||||
// The in-memory flush size is bigger than the size of a single cell,
|
// The in-memory flush size is bigger than the size of a single cell,
|
||||||
// but smaller than the size of two cells.
|
// but smaller than the size of two cells.
|
||||||
// Therefore, the two created cells are flattened together.
|
// Therefore, the two created cells are flushed together as a single CSLMImmutableSegment and
|
||||||
|
// flattened.
|
||||||
totalHeapSize = MutableSegment.DEEP_OVERHEAD
|
totalHeapSize = MutableSegment.DEEP_OVERHEAD
|
||||||
+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
|
+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
|
||||||
+ 1 * oneCellOnCSLMHeapSize
|
+ 2 * oneCellOnCCMHeapSize;
|
||||||
+ 1 * oneCellOnCCMHeapSize;
|
|
||||||
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,10 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.IntBinaryOperator;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -1727,8 +1731,11 @@ public class TestHStore {
|
||||||
assertArrayEquals(table, hFileContext.getTableName());
|
assertArrayEquals(table, hFileContext.getTableName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
|
||||||
|
// but its dataSize exceeds inmemoryFlushSize
|
||||||
@Test
|
@Test
|
||||||
public void testCompactingMemStoreStuckBug26026() throws IOException, InterruptedException {
|
public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
||||||
byte[] smallValue = new byte[3];
|
byte[] smallValue = new byte[3];
|
||||||
|
@ -1752,12 +1759,15 @@ public class TestHStore {
|
||||||
MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
|
MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
|
||||||
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
|
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
|
||||||
myCompactingMemStore.smallCellPreUpdateCounter.set(0);
|
myCompactingMemStore.smallCellPreUpdateCounter.set(0);
|
||||||
myCompactingMemStore.smallCellPostUpdateCounter.set(0);
|
|
||||||
myCompactingMemStore.largeCellPreUpdateCounter.set(0);
|
myCompactingMemStore.largeCellPreUpdateCounter.set(0);
|
||||||
myCompactingMemStore.largeCellPostUpdateCounter.set(0);
|
|
||||||
|
|
||||||
|
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
|
||||||
Thread smallCellThread = new Thread(() -> {
|
Thread smallCellThread = new Thread(() -> {
|
||||||
|
try {
|
||||||
store.add(smallCell, new NonThreadSafeMemStoreSizing());
|
store.add(smallCell, new NonThreadSafeMemStoreSizing());
|
||||||
|
} catch (Throwable exception) {
|
||||||
|
exceptionRef.set(exception);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
|
smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
|
||||||
smallCellThread.start();
|
smallCellThread.start();
|
||||||
|
@ -1765,9 +1775,9 @@ public class TestHStore {
|
||||||
String oldThreadName = Thread.currentThread().getName();
|
String oldThreadName = Thread.currentThread().getName();
|
||||||
try {
|
try {
|
||||||
/**
|
/**
|
||||||
* 1.smallCellThread enters CompactingMemStore.shouldFlushInMemory first, when largeCellThread
|
* 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
|
||||||
* enters CompactingMemStore.shouldFlushInMemory, CompactingMemStore.active.getDataSize could
|
* largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
|
||||||
* not accommodate cellToAdd and CompactingMemStore.shouldFlushInMemory return true.
|
* invokes flushInMemory.
|
||||||
* <p/>
|
* <p/>
|
||||||
* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
|
* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
|
||||||
* can add cell to currentActive . That is to say when largeCellThread called flushInMemory
|
* can add cell to currentActive . That is to say when largeCellThread called flushInMemory
|
||||||
|
@ -1786,6 +1796,143 @@ public class TestHStore {
|
||||||
Thread.currentThread().setName(oldThreadName);
|
Thread.currentThread().setName(oldThreadName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assertTrue(exceptionRef.get() == null);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
|
||||||
|
// InmemoryFlushSize
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testCompactingMemStoreCellExceedInmemoryFlushSize()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
|
||||||
|
|
||||||
|
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
||||||
|
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
||||||
|
|
||||||
|
int size = (int) ((CompactingMemStore) store.memstore).getInmemoryFlushSize();
|
||||||
|
byte[] value = new byte[size + 1];
|
||||||
|
|
||||||
|
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||||
|
long timestamp = EnvironmentEdgeManager.currentTime();
|
||||||
|
long seqId = 100;
|
||||||
|
Cell cell = createCell(qf1, timestamp, seqId, value);
|
||||||
|
int cellByteSize = MutableSegment.getCellLength(cell);
|
||||||
|
store.add(cell, memStoreSizing);
|
||||||
|
assertTrue(memStoreSizing.getCellsCount() == 1);
|
||||||
|
assertTrue(memStoreSizing.getDataSize() == cellByteSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test is for HBASE-26210 also, test write large cell and small cell concurrently when
|
||||||
|
// InmemoryFlushSize is smaller,equal with and larger than cell size.
|
||||||
|
@Test
|
||||||
|
public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
doWriteTestLargeCellAndSmallCellConcurrently(
|
||||||
|
(smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
|
||||||
|
doWriteTestLargeCellAndSmallCellConcurrently(
|
||||||
|
(smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
|
||||||
|
doWriteTestLargeCellAndSmallCellConcurrently(
|
||||||
|
(smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
|
||||||
|
doWriteTestLargeCellAndSmallCellConcurrently(
|
||||||
|
(smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
|
||||||
|
doWriteTestLargeCellAndSmallCellConcurrently(
|
||||||
|
(smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doWriteTestLargeCellAndSmallCellConcurrently(
|
||||||
|
IntBinaryOperator getFlushByteSize)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
||||||
|
byte[] smallValue = new byte[3];
|
||||||
|
byte[] largeValue = new byte[100];
|
||||||
|
final long timestamp = EnvironmentEdgeManager.currentTime();
|
||||||
|
final long seqId = 100;
|
||||||
|
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
|
||||||
|
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
|
||||||
|
int smallCellByteSize = MutableSegment.getCellLength(smallCell);
|
||||||
|
int largeCellByteSize = MutableSegment.getCellLength(largeCell);
|
||||||
|
int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
|
||||||
|
boolean flushByteSizeLessThanSmallAndLargeCellSize =
|
||||||
|
flushByteSize < (smallCellByteSize + largeCellByteSize);
|
||||||
|
|
||||||
|
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
|
||||||
|
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
|
||||||
|
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
|
||||||
|
|
||||||
|
|
||||||
|
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
||||||
|
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
|
||||||
|
|
||||||
|
MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
|
||||||
|
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
|
||||||
|
myCompactingMemStore.disableCompaction();
|
||||||
|
if (flushByteSizeLessThanSmallAndLargeCellSize) {
|
||||||
|
myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
|
||||||
|
} else {
|
||||||
|
myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
|
||||||
|
final AtomicLong totalCellByteSize = new AtomicLong(0);
|
||||||
|
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
|
||||||
|
Thread smallCellThread = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
|
||||||
|
long currentTimestamp = timestamp + i;
|
||||||
|
Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
|
||||||
|
totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
|
||||||
|
store.add(cell, memStoreSizing);
|
||||||
|
}
|
||||||
|
} catch (Throwable exception) {
|
||||||
|
exceptionRef.set(exception);
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
|
||||||
|
smallCellThread.start();
|
||||||
|
|
||||||
|
String oldThreadName = Thread.currentThread().getName();
|
||||||
|
try {
|
||||||
|
/**
|
||||||
|
* When flushByteSizeLessThanSmallAndLargeCellSize is true:
|
||||||
|
* </p>
|
||||||
|
* 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
|
||||||
|
* largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
|
||||||
|
* largeCellThread invokes flushInMemory.
|
||||||
|
* <p/>
|
||||||
|
* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
|
||||||
|
* can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
|
||||||
|
* <p/>
|
||||||
|
* When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
|
||||||
|
* largeCellThread concurrently write one cell and wait each other, and then write another
|
||||||
|
* cell etc.
|
||||||
|
*/
|
||||||
|
Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
|
||||||
|
for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
|
||||||
|
long currentTimestamp = timestamp + i;
|
||||||
|
Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
|
||||||
|
totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
|
||||||
|
store.add(cell, memStoreSizing);
|
||||||
|
}
|
||||||
|
smallCellThread.join();
|
||||||
|
|
||||||
|
assertTrue(exceptionRef.get() == null);
|
||||||
|
assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
|
||||||
|
assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
|
||||||
|
if (flushByteSizeLessThanSmallAndLargeCellSize) {
|
||||||
|
assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
|
||||||
|
} else {
|
||||||
|
assertTrue(
|
||||||
|
myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
Thread.currentThread().setName(oldThreadName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private HStoreFile mockStoreFileWithLength(long length) {
|
private HStoreFile mockStoreFileWithLength(long length) {
|
||||||
|
@ -1889,7 +2036,7 @@ public class TestHStore {
|
||||||
return new ArrayList<>(capacity);
|
return new ArrayList<>(capacity);
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
protected void pushActiveToPipeline(MutableSegment active) {
|
protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
|
||||||
if (START_TEST.get()) {
|
if (START_TEST.get()) {
|
||||||
try {
|
try {
|
||||||
getScannerLatch.await();
|
getScannerLatch.await();
|
||||||
|
@ -1898,7 +2045,7 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
super.pushActiveToPipeline(active);
|
super.pushActiveToPipeline(active, checkEmpty);
|
||||||
if (START_TEST.get()) {
|
if (START_TEST.get()) {
|
||||||
snapshotLatch.countDown();
|
snapshotLatch.countDown();
|
||||||
}
|
}
|
||||||
|
@ -1995,8 +2142,6 @@ public class TestHStore {
|
||||||
private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
|
private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
|
||||||
private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
|
private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
|
||||||
private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
|
private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
|
||||||
private final AtomicInteger largeCellPostUpdateCounter = new AtomicInteger(0);
|
|
||||||
private final AtomicInteger smallCellPostUpdateCounter = new AtomicInteger(0);
|
|
||||||
|
|
||||||
public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
|
public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
|
||||||
HStore store, RegionServicesForStores regionServices,
|
HStore store, RegionServicesForStores regionServices,
|
||||||
|
@ -2004,16 +2149,17 @@ public class TestHStore {
|
||||||
super(conf, cellComparator, store, regionServices, compactionPolicy);
|
super(conf, cellComparator, store, regionServices, compactionPolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
|
@Override
|
||||||
|
protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
|
||||||
MemStoreSizing memstoreSizing) {
|
MemStoreSizing memstoreSizing) {
|
||||||
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
|
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
|
||||||
int currentCount = largeCellPreUpdateCounter.incrementAndGet();
|
int currentCount = largeCellPreUpdateCounter.incrementAndGet();
|
||||||
if (currentCount <= 1) {
|
if (currentCount <= 1) {
|
||||||
try {
|
try {
|
||||||
/**
|
/**
|
||||||
* smallCellThread enters super.shouldFlushInMemory first, when largeCellThread enters
|
* smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
|
||||||
* super.shouldFlushInMemory, currActive.getDataSize could not accommodate cellToAdd and
|
* largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
|
||||||
* super.shouldFlushInMemory return true.
|
* largeCellThread invokes flushInMemory.
|
||||||
*/
|
*/
|
||||||
preCyclicBarrier.await();
|
preCyclicBarrier.await();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -2022,7 +2168,7 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean returnValue = super.shouldFlushInMemory(currActive, cellToAdd, memstoreSizing);
|
boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
|
||||||
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
|
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
|
||||||
try {
|
try {
|
||||||
preCyclicBarrier.await();
|
preCyclicBarrier.await();
|
||||||
|
@ -2065,4 +2211,93 @@ public class TestHStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class MyCompactingMemStore3 extends CompactingMemStore {
|
||||||
|
private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
|
||||||
|
private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
|
||||||
|
|
||||||
|
private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
|
||||||
|
private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
|
||||||
|
private final AtomicInteger flushCounter = new AtomicInteger(0);
|
||||||
|
private static final int CELL_COUNT = 5;
|
||||||
|
private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
|
||||||
|
|
||||||
|
public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
|
||||||
|
HStore store, RegionServicesForStores regionServices,
|
||||||
|
MemoryCompactionPolicy compactionPolicy) throws IOException {
|
||||||
|
super(conf, cellComparator, store, regionServices, compactionPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
|
||||||
|
MemStoreSizing memstoreSizing) {
|
||||||
|
if (!flushByteSizeLessThanSmallAndLargeCellSize) {
|
||||||
|
return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
|
||||||
|
}
|
||||||
|
if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
|
||||||
|
try {
|
||||||
|
preCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
|
||||||
|
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
|
||||||
|
try {
|
||||||
|
preCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return returnValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void postUpdate(MutableSegment currentActiveMutableSegment) {
|
||||||
|
super.postUpdate(currentActiveMutableSegment);
|
||||||
|
if (!flushByteSizeLessThanSmallAndLargeCellSize) {
|
||||||
|
try {
|
||||||
|
postCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
|
||||||
|
try {
|
||||||
|
postCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
|
||||||
|
super.flushInMemory(currentActiveMutableSegment);
|
||||||
|
flushCounter.incrementAndGet();
|
||||||
|
if (!flushByteSizeLessThanSmallAndLargeCellSize) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
|
||||||
|
try {
|
||||||
|
postCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void disableCompaction() {
|
||||||
|
allowCompaction.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void enableCompaction() {
|
||||||
|
allowCompaction.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue