diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 556cf04c6c3..4b923ff3249 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -70,7 +70,9 @@ public abstract class AbstractMemStore implements MemStore { protected static void addToScanners(Segment segment, long readPt, List scanners) { - scanners.add(segment.getScanner(readPt)); + if (!segment.isEmpty()) { + scanners.add(segment.getScanner(readPt)); + } } protected AbstractMemStore(final Configuration conf, final CellComparator c, @@ -156,7 +158,7 @@ public abstract class AbstractMemStore implements MemStore { } } - private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { + protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false); boolean mslabUsed = (toAdd != cell); // This cell data is backed by the same byte[] where we read request in RPC(See diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 4ee2db94724..973d5a41416 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -453,9 +453,14 @@ public class CompactingMemStore extends AbstractMemStore { inMemoryCompaction(); } - private void flushInMemory(MutableSegment currActive) { + protected void flushInMemory(MutableSegment currActive) { LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); - pushActiveToPipeline(currActive); + // NOTE: Due to concurrent writes and because we first add cell size to currActive.getDataSize + // and then actually add cell to currActive.cellSet, it is possible that + // currActive.getDataSize could not accommodate cellToAdd but currActive.cellSet is still + // empty if pending writes which not yet add cells to currActive.cellSet. + // so here we should not check currActive.isEmpty or not. + pushActiveToPipeline(currActive, false); } void inMemoryCompaction() { @@ -524,7 +529,23 @@ public class CompactingMemStore extends AbstractMemStore { } protected void pushActiveToPipeline(MutableSegment currActive) { - if (!currActive.isEmpty()) { + pushActiveToPipeline(currActive, true); + } + + /** + * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to + * concurrent writes and because we first add cell size to currActive.getDataSize and then + * actually add cell to currActive.cellSet, it is possible that currActive.getDataSize could not + * accommodate cellToAdd but currActive.cellSet is still empty if pending writes which not yet add + * cells to currActive.cellSet,so for + * {@link CompactingMemStore#flushInMemory(MutableSegment)},checkEmpty parameter is false. But if + * {@link CompactingMemStore#snapshot} called this method,because there is no pending + * write,checkEmpty parameter could be true. + * @param currActive + * @param checkEmpty + */ + protected void pushActiveToPipeline(MutableSegment currActive, boolean checkEmpty) { + if (!checkEmpty || !currActive.isEmpty()) { pipeline.pushHead(currActive); resetActive(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 711cfd3cb21..194e0654511 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -223,10 +223,16 @@ public class CompactionPipeline { LOG.warn("Segment flattening failed, because versions do not match"); return false; } - int i = 0; + int i = -1; for (ImmutableSegment s : pipeline) { + i++; if ( s.canBeFlattened() ) { s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed + if (s.isEmpty()) { + // after s.waitForUpdates() is called, there is no updates preceding,if no cells in s, + // we can skip it. + continue; + } // size to be updated MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing(); ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( @@ -242,9 +248,7 @@ public class CompactionPipeline { LOG.debug("Compaction pipeline segment {} flattened", s); return true; } - i++; } - } // do not update the global memstore size counter and do not increase the version, // because all the cells remain in place diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 4df9ae0b60c..4a994309ec8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -208,7 +208,8 @@ public class MemStoreCompactor { MemStoreSegmentsIterator iterator = null; List segments = versionedList.getStoreSegments(); for (ImmutableSegment s : segments) { - s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed + s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed. + // we skip empty segment when create MemStoreSegmentsIterator following. } switch (action) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 38fa587af99..297f5cdde27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -176,11 +176,15 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // use case 1: both kvs in kvset this.memstore.add(kv1.clone(), null); this.memstore.add(kv2.clone(), null); - verifyScanAcrossSnapshot2(kv1, kv2); + // snapshot is empty,active segment is not empty, + // empty segment is skipped. + verifyOneScanAcrossSnapshot2(kv1, kv2); // use case 2: both kvs in snapshot this.memstore.snapshot(); - verifyScanAcrossSnapshot2(kv1, kv2); + // active segment is empty,snapshot is not empty, + // empty segment is skipped. + verifyOneScanAcrossSnapshot2(kv1, kv2); // use case 3: first in snapshot second in kvset this.memstore = new CompactingMemStore(HBaseConfiguration.create(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 973547dcdfd..0dc35c57b0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -255,11 +255,16 @@ public class TestDefaultMemStore { // use case 1: both kvs in kvset this.memstore.add(kv1.clone(), null); this.memstore.add(kv2.clone(), null); - verifyScanAcrossSnapshot2(kv1, kv2); + // snapshot is empty,active segment is not empty, + // empty segment is skipped. + verifyOneScanAcrossSnapshot2(kv1, kv2); // use case 2: both kvs in snapshot + // active segment is empty,snapshot is not empty, + // empty segment is skipped. this.memstore.snapshot(); - verifyScanAcrossSnapshot2(kv1, kv2); + // + verifyOneScanAcrossSnapshot2(kv1, kv2); // use case 3: first in snapshot second in kvset this.memstore = new DefaultMemStore(); @@ -288,6 +293,18 @@ public class TestDefaultMemStore { assertNull(scanner1.next()); } + protected void verifyOneScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { + List memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); + assertEquals(1, memstorescanners.size()); + final KeyValueScanner scanner0 = memstorescanners.get(0); + scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + Cell n0 = scanner0.next(); + Cell n1 = scanner0.next(); + assertTrue(kv1.equals(n0)); + assertTrue(kv2.equals(n1)); + assertNull(scanner0.next()); + } + protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) throws IOException { scanner.seek(KeyValueUtil.createFirstOnRow(new byte[]{})); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index e024eecc3b4..0e0a352fce4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -43,6 +43,7 @@ import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @@ -1726,6 +1727,67 @@ public class TestHStore { assertArrayEquals(table, hFileContext.getTableName()); } + @Test + public void testCompactingMemStoreStuckBug26026() throws IOException, InterruptedException { + Configuration conf = HBaseConfiguration.create(); + + byte[] smallValue = new byte[3]; + byte[] largeValue = new byte[9]; + final long timestamp = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); + final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); + int smallCellByteSize = MutableSegment.getCellLength(smallCell); + int largeCellByteSize = MutableSegment.getCellLength(largeCell); + int flushByteSize = smallCellByteSize + largeCellByteSize - 2; + + // set CompactingMemStore.inmemoryFlushSize to flushByteSize. + conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName()); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); + + init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) + .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); + + MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); + assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); + myCompactingMemStore.smallCellPreUpdateCounter.set(0); + myCompactingMemStore.smallCellPostUpdateCounter.set(0); + myCompactingMemStore.largeCellPreUpdateCounter.set(0); + myCompactingMemStore.largeCellPostUpdateCounter.set(0); + + Thread smallCellThread = new Thread(() -> { + store.add(smallCell, new NonThreadSafeMemStoreSizing()); + }); + smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); + smallCellThread.start(); + + String oldThreadName = Thread.currentThread().getName(); + try { + /** + * 1.smallCellThread enters CompactingMemStore.shouldFlushInMemory first, when largeCellThread + * enters CompactingMemStore.shouldFlushInMemory, CompactingMemStore.active.getDataSize could + * not accommodate cellToAdd and CompactingMemStore.shouldFlushInMemory return true. + *

+ * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread + * can add cell to currentActive . That is to say when largeCellThread called flushInMemory + * method, CompactingMemStore.active has no cell. + */ + Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME); + store.add(largeCell, new NonThreadSafeMemStoreSizing()); + smallCellThread.join(); + + for (int i = 0; i < 100; i++) { + long currentTimestamp = timestamp + 100 + i; + Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); + store.add(cell, new NonThreadSafeMemStoreSizing()); + } + } finally { + Thread.currentThread().setName(oldThreadName); + } + + } + private HStoreFile mockStoreFileWithLength(long length) { HStoreFile sf = mock(HStoreFile.class); StoreFileReader sfr = mock(StoreFileReader.class); @@ -1925,4 +1987,82 @@ public class TestHStore { @Override public List subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} } + + public static class MyCompactingMemStore2 extends CompactingMemStore { + private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; + private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; + private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); + private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); + private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); + private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); + private final AtomicInteger largeCellPostUpdateCounter = new AtomicInteger(0); + private final AtomicInteger smallCellPostUpdateCounter = new AtomicInteger(0); + + public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, + HStore store, RegionServicesForStores regionServices, + MemoryCompactionPolicy compactionPolicy) throws IOException { + super(conf, cellComparator, store, regionServices, compactionPolicy); + } + + protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd, + MemStoreSizing memstoreSizing) { + if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { + int currentCount = largeCellPreUpdateCounter.incrementAndGet(); + if (currentCount <= 1) { + try { + /** + * smallCellThread enters super.shouldFlushInMemory first, when largeCellThread enters + * super.shouldFlushInMemory, currActive.getDataSize could not accommodate cellToAdd and + * super.shouldFlushInMemory return true. + */ + preCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + + boolean returnValue = super.shouldFlushInMemory(currActive, cellToAdd, memstoreSizing); + if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { + try { + preCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + return returnValue; + } + + @Override + protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { + if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { + try { + /** + * After largeCellThread finished flushInMemory method, smallCellThread can add cell to + * currentActive . That is to say when largeCellThread called flushInMemory method, + * currentActive has no cell. + */ + postCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + super.doAdd(currentActive, cell, memstoreSizing); + } + + @Override + protected void flushInMemory(MutableSegment currentActiveMutableSegment) { + super.flushInMemory(currentActiveMutableSegment); + if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { + if (largeCellPreUpdateCounter.get() <= 1) { + try { + postCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + } + + } }