HBASE-18158 Two running in-memory compaction threads may lose data for flushing
This commit is contained in:
parent
9329a18c2d
commit
9c8c749cd3
|
@ -94,10 +94,15 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
this.store = store;
|
||||
this.regionServices = regionServices;
|
||||
this.pipeline = new CompactionPipeline(getRegionServices());
|
||||
this.compactor = new MemStoreCompactor(this, compactionPolicy);
|
||||
this.compactor = createMemStoreCompactor(compactionPolicy);
|
||||
initInmemoryFlushSize(conf);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
|
||||
return new MemStoreCompactor(this, compactionPolicy);
|
||||
}
|
||||
|
||||
private void initInmemoryFlushSize(Configuration conf) {
|
||||
long memstoreFlushSize = getRegionServices().getMemstoreFlushSize();
|
||||
int numStores = getRegionServices().getNumStores();
|
||||
|
@ -410,7 +415,8 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
return getRegionServices().getInMemoryCompactionPool();
|
||||
}
|
||||
|
||||
private boolean shouldFlushInMemory() {
|
||||
@VisibleForTesting
|
||||
protected boolean shouldFlushInMemory() {
|
||||
if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold
|
||||
if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush
|
||||
return false; // regardless the size
|
||||
|
@ -430,7 +436,6 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
private void stopCompaction() {
|
||||
if (inMemoryFlushInProgress.get()) {
|
||||
compactor.stop();
|
||||
inMemoryFlushInProgress.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.CellUtil;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -70,6 +71,7 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -102,6 +104,7 @@ import org.junit.rules.TestName;
|
|||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Test class for the Store
|
||||
|
@ -1213,6 +1216,59 @@ public class TestStore {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
|
||||
* may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
|
||||
* versionedList to remove the corresponding segments.
|
||||
* In short, there will be some segements which isn't in merge are removed.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=30000)
|
||||
public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
|
||||
int flushSize = 500;
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
|
||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
|
||||
// Set the lower threshold to invoke the "MERGE" policy
|
||||
conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
|
||||
init(name.getMethodName(), conf, hcd);
|
||||
byte[] value = Bytes.toBytes("thisisavarylargevalue");
|
||||
MemstoreSize memStoreSize = new MemstoreSize();
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
long seqId = 100;
|
||||
// older data whihc shouldn't be "seen" by client
|
||||
store.add(createCell(qf1, ts, seqId, value), memStoreSize);
|
||||
store.add(createCell(qf2, ts, seqId, value), memStoreSize);
|
||||
store.add(createCell(qf3, ts, seqId, value), memStoreSize);
|
||||
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
|
||||
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
|
||||
storeFlushCtx.prepare();
|
||||
// This shouldn't invoke another in-memory flush because the first compactor thread
|
||||
// hasn't accomplished the in-memory compaction.
|
||||
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
|
||||
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
|
||||
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
|
||||
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
|
||||
//okay. Let the compaction be completed
|
||||
MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
|
||||
CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
|
||||
while (mem.isMemStoreFlushingInMemory()) {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
}
|
||||
// This should invoke another in-memory flush.
|
||||
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
|
||||
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
|
||||
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
|
||||
assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
|
||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
|
||||
String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
|
||||
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
|
||||
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
|
||||
}
|
||||
|
||||
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
|
@ -1242,6 +1298,51 @@ public class TestStore {
|
|||
void hook(MyStore store) throws IOException;
|
||||
}
|
||||
|
||||
private static class MyMemStoreCompactor extends MemStoreCompactor {
|
||||
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
|
||||
private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
|
||||
public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) {
|
||||
super(compactingMemStore, compactionPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean start() throws IOException {
|
||||
boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
|
||||
boolean rval = super.start();
|
||||
if (isFirst) {
|
||||
try {
|
||||
START_COMPACTOR_LATCH.await();
|
||||
} catch (InterruptedException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
}
|
||||
|
||||
public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
|
||||
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
|
||||
public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparator c,
|
||||
HStore store, RegionServicesForStores regionServices,
|
||||
MemoryCompactionPolicy compactionPolicy) throws IOException {
|
||||
super(conf, c, store, regionServices, compactionPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
|
||||
return new MyMemStoreCompactor(this, compactionPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldFlushInMemory() {
|
||||
boolean rval = super.shouldFlushInMemory();
|
||||
if (rval) {
|
||||
RUNNER_COUNT.incrementAndGet();
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
}
|
||||
|
||||
public static class MyCompactingMemStore extends CompactingMemStore {
|
||||
private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
|
||||
private final CountDownLatch getScannerLatch = new CountDownLatch(1);
|
||||
|
|
Loading…
Reference in New Issue