From 9c8c749cd35bea3df2d7dd1fe857ec6452e4e68a Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 7 Jun 2017 14:10:48 +0800 Subject: [PATCH] HBASE-18158 Two running in-memory compaction threads may lose data for flushing --- .../regionserver/CompactingMemStore.java | 11 +- .../hadoop/hbase/regionserver/TestStore.java | 101 ++++++++++++++++++ 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 8d71efc1a2f..5b9372a8268 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -94,10 +94,15 @@ public class CompactingMemStore extends AbstractMemStore { this.store = store; this.regionServices = regionServices; this.pipeline = new CompactionPipeline(getRegionServices()); - this.compactor = new MemStoreCompactor(this, compactionPolicy); + this.compactor = createMemStoreCompactor(compactionPolicy); initInmemoryFlushSize(conf); } + @VisibleForTesting + protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) { + return new MemStoreCompactor(this, compactionPolicy); + } + private void initInmemoryFlushSize(Configuration conf) { long memstoreFlushSize = getRegionServices().getMemstoreFlushSize(); int numStores = getRegionServices().getNumStores(); @@ -410,7 +415,8 @@ public class CompactingMemStore extends AbstractMemStore { return getRegionServices().getInMemoryCompactionPool(); } - private boolean shouldFlushInMemory() { + @VisibleForTesting + protected boolean shouldFlushInMemory() { if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush return false; // regardless the size @@ -430,7 +436,6 @@ public class CompactingMemStore extends AbstractMemStore { private void stopCompaction() { if (inMemoryFlushInProgress.get()) { compactor.stop(); - inMemoryFlushInProgress.set(false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 1946d4345a1..22539c57f6b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -70,6 +71,7 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -102,6 +104,7 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import com.google.common.collect.Lists; +import java.util.concurrent.atomic.AtomicInteger; /** * Test class for the Store @@ -1213,6 +1216,59 @@ public class TestStore { } } + /** + * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable + * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned + * versionedList to remove the corresponding segments. + * In short, there will be some segements which isn't in merge are removed. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=30000) + public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { + int flushSize = 500; + Configuration conf = HBaseConfiguration.create(); + conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); + // Set the lower threshold to invoke the "MERGE" policy + conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); + init(name.getMethodName(), conf, hcd); + byte[] value = Bytes.toBytes("thisisavarylargevalue"); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts, seqId, value), memStoreSize); + store.add(createCell(qf2, ts, seqId, value), memStoreSize); + store.add(createCell(qf3, ts, seqId, value), memStoreSize); + assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); + StoreFlushContext storeFlushCtx = store.createFlushContext(id++); + storeFlushCtx.prepare(); + // This shouldn't invoke another in-memory flush because the first compactor thread + // hasn't accomplished the in-memory compaction. + store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); + store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); + store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); + assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); + //okay. Let the compaction be completed + MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); + CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore; + while (mem.isMemStoreFlushingInMemory()) { + TimeUnit.SECONDS.sleep(1); + } + // This should invoke another in-memory flush. + store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); + store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); + store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); + assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + } + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); @@ -1242,6 +1298,51 @@ public class TestStore { void hook(MyStore store) throws IOException; } + private static class MyMemStoreCompactor extends MemStoreCompactor { + private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); + private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); + public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) { + super(compactingMemStore, compactionPolicy); + } + + @Override + public boolean start() throws IOException { + boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; + boolean rval = super.start(); + if (isFirst) { + try { + START_COMPACTOR_LATCH.await(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + return rval; + } + } + + public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { + private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); + public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparator c, + HStore store, RegionServicesForStores regionServices, + MemoryCompactionPolicy compactionPolicy) throws IOException { + super(conf, c, store, regionServices, compactionPolicy); + } + + @Override + protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) { + return new MyMemStoreCompactor(this, compactionPolicy); + } + + @Override + protected boolean shouldFlushInMemory() { + boolean rval = super.shouldFlushInMemory(); + if (rval) { + RUNNER_COUNT.incrementAndGet(); + } + return rval; + } + } + public static class MyCompactingMemStore extends CompactingMemStore { private static final AtomicBoolean START_TEST = new AtomicBoolean(false); private final CountDownLatch getScannerLatch = new CountDownLatch(1);