HBASE-18158 Two running in-memory compaction threads may lose data for flushing

This commit is contained in:
Chia-Ping Tsai 2017-06-07 14:10:48 +08:00
parent 1d3252eb59
commit 4a1529c89b
2 changed files with 109 additions and 3 deletions

View File

@ -94,10 +94,15 @@ public class CompactingMemStore extends AbstractMemStore {
this.store = store; this.store = store;
this.regionServices = regionServices; this.regionServices = regionServices;
this.pipeline = new CompactionPipeline(getRegionServices()); this.pipeline = new CompactionPipeline(getRegionServices());
this.compactor = new MemStoreCompactor(this, compactionPolicy); this.compactor = createMemStoreCompactor(compactionPolicy);
initInmemoryFlushSize(conf); initInmemoryFlushSize(conf);
} }
@VisibleForTesting
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
return new MemStoreCompactor(this, compactionPolicy);
}
private void initInmemoryFlushSize(Configuration conf) { private void initInmemoryFlushSize(Configuration conf) {
long memstoreFlushSize = getRegionServices().getMemstoreFlushSize(); long memstoreFlushSize = getRegionServices().getMemstoreFlushSize();
int numStores = getRegionServices().getNumStores(); int numStores = getRegionServices().getNumStores();
@ -410,7 +415,8 @@ public class CompactingMemStore extends AbstractMemStore {
return getRegionServices().getInMemoryCompactionPool(); return getRegionServices().getInMemoryCompactionPool();
} }
private boolean shouldFlushInMemory() { @VisibleForTesting
protected boolean shouldFlushInMemory() {
if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold
if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush
return false; // regardless the size return false; // regardless the size
@ -430,7 +436,6 @@ public class CompactingMemStore extends AbstractMemStore {
private void stopCompaction() { private void stopCompaction() {
if (inMemoryFlushInProgress.get()) { if (inMemoryFlushInProgress.get()) {
compactor.stop(); compactor.stop();
inMemoryFlushInProgress.set(false);
} }
} }

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
@ -70,6 +71,7 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -102,6 +104,7 @@ import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Test class for the Store * Test class for the Store
@ -1213,6 +1216,59 @@ public class TestStore {
} }
} }
/**
* If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
* may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
* versionedList to remove the corresponding segments.
* In short, there will be some segements which isn't in merge are removed.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=30000)
public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
int flushSize = 500;
Configuration conf = HBaseConfiguration.create();
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
// Set the lower threshold to invoke the "MERGE" policy
conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
init(name.getMethodName(), conf, hcd);
byte[] value = Bytes.toBytes("thisisavarylargevalue");
MemstoreSize memStoreSize = new MemstoreSize();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), memStoreSize);
store.add(createCell(qf2, ts, seqId, value), memStoreSize);
store.add(createCell(qf3, ts, seqId, value), memStoreSize);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
storeFlushCtx.prepare();
// This shouldn't invoke another in-memory flush because the first compactor thread
// hasn't accomplished the in-memory compaction.
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
//okay. Let the compaction be completed
MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
while (mem.isMemStoreFlushingInMemory()) {
TimeUnit.SECONDS.sleep(1);
}
// This should invoke another in-memory flush.
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
HColumnDescriptor hcd = new HColumnDescriptor(family); HColumnDescriptor hcd = new HColumnDescriptor(family);
@ -1242,6 +1298,51 @@ public class TestStore {
void hook(MyStore store) throws IOException; void hook(MyStore store) throws IOException;
} }
private static class MyMemStoreCompactor extends MemStoreCompactor {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) {
super(compactingMemStore, compactionPolicy);
}
@Override
public boolean start() throws IOException {
boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
boolean rval = super.start();
if (isFirst) {
try {
START_COMPACTOR_LATCH.await();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
return rval;
}
}
public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparator c,
HStore store, RegionServicesForStores regionServices,
MemoryCompactionPolicy compactionPolicy) throws IOException {
super(conf, c, store, regionServices, compactionPolicy);
}
@Override
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
return new MyMemStoreCompactor(this, compactionPolicy);
}
@Override
protected boolean shouldFlushInMemory() {
boolean rval = super.shouldFlushInMemory();
if (rval) {
RUNNER_COUNT.incrementAndGet();
}
return rval;
}
}
public static class MyCompactingMemStore extends CompactingMemStore { public static class MyCompactingMemStore extends CompactingMemStore {
private static final AtomicBoolean START_TEST = new AtomicBoolean(false); private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
private final CountDownLatch getScannerLatch = new CountDownLatch(1); private final CountDownLatch getScannerLatch = new CountDownLatch(1);