diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 5f52b9562c5..72bb86c1bcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -68,7 +68,7 @@ public class CompactionPipeline { /** *
* Version is volatile to ensure it is atomically read when not using a lock. - * To indicate whether the suffix of pipleline changes: + * To indicate whether the suffix of pipeline changes: * 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only * added at Head, {@link #version} not change. * 2.for {@link CompactionPipeline#swap},{@link #version} increase. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java index 0ce52b6fc75..bc4bf786067 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.errorprone.annotations.RestrictedApi; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.yetus.audience.InterfaceAudience; /** @@ -31,13 +33,16 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class ImmutableMemStoreLAB implements MemStoreLAB { - private final AtomicInteger openScannerCount = new AtomicInteger(); - private volatile boolean closed = false; + private final RefCnt refCnt; + private final AtomicBoolean closed = new AtomicBoolean(false); private final Listmslabs; public ImmutableMemStoreLAB(List mslabs) { this.mslabs = mslabs; + this.refCnt = RefCnt.create(() -> { + closeMSLABs(); + }); } @Override @@ -85,47 +90,43 @@ public class ImmutableMemStoreLAB implements MemStoreLAB { @Override public void close() { - // 'openScannerCount' here tracks the scanners opened on segments which directly refer to this - // MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of + // 'refCnt' here tracks the scanners opened on segments which directly refer to this + // MSLAB. The individual MSLABs this refers also having its own 'refCnt'. The usage of // the variable in close() and decScannerCount() is as as that in HeapMemstoreLAB. Here the // close just delegates the call to the individual MSLABs. The actual return of the chunks to // MSLABPool will happen within individual MSLABs only (which is at the leaf level). // Say an ImmutableMemStoreLAB is created over 2 HeapMemStoreLABs at some point and at that time - // both of them were referred by ongoing scanners. So they have > 0 'openScannerCount'. Now over - // the new Segment some scanners come in and this MSLABs 'openScannerCount' also goes up and + // both of them were referred by ongoing scanners. So they have > 0 'refCnt'. Now over + // the new Segment some scanners come in and this MSLABs 'refCnt' also goes up and // then come down on finish of scanners. Now a close() call comes to this Immutable MSLAB. As - // it's 'openScannerCount' is zero it will call close() on both of the Heap MSLABs. Say by that + // it's 'refCnt' is zero it will call close() on both of the Heap MSLABs. Say by that // time the old scanners on one of the MSLAB got over where as on the other, still an old // scanner is going on. The call close() on that MSLAB will not close it immediately but will - // just mark it for closure as it's 'openScannerCount' still > 0. Later once the old scan is + // just decrease it's 'refCnt' and it's 'refCnt' still > 0. Later once the old scan is // over, the decScannerCount() call will do the actual close and return of the chunks. - this.closed = true; + if (!this.closed.compareAndSet(false, true)) { + return; + } // When there are still on going scanners over this MSLAB, we will defer the close until all - // scanners finish. We will just mark it for closure. See #decScannerCount(). This will be - // called at end of every scan. When it is marked for closure and scanner count reached 0, we - // will do the actual close then. - checkAndCloseMSLABs(openScannerCount.get()); + // scanners finish. We will just decrease it's 'refCnt'. See #decScannerCount(). This will be + // called at end of every scan. When it's 'refCnt' reached 0, we will do the actual close then. + this.refCnt.release(); } - private void checkAndCloseMSLABs(int openScanners) { - if (openScanners == 0) { - for (MemStoreLAB mslab : this.mslabs) { - mslab.close(); - } + private void closeMSLABs() { + for (MemStoreLAB mslab : this.mslabs) { + mslab.close(); } } @Override public void incScannerCount() { - this.openScannerCount.incrementAndGet(); + this.refCnt.retain(); } @Override public void decScannerCount() { - int count = this.openScannerCount.decrementAndGet(); - if (this.closed) { - checkAndCloseMSLABs(count); - } + this.refCnt.release(); } @Override @@ -138,5 +139,17 @@ public class ImmutableMemStoreLAB implements MemStoreLAB { return ChunkCreator.getInstance().isOffheap(); } + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + int getRefCntValue() { + return this.refCnt.refCnt(); + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + boolean isClosed() { + return this.closed.get(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 5bc47dc4424..33174cae760 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -18,13 +18,13 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.errorprone.annotations.RestrictedApi; import java.nio.ByteBuffer; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,8 +86,11 @@ public class MemStoreLABImpl implements MemStoreLAB { // This flag is for reclaiming chunks. Its set when putting chunks back to // pool private final AtomicBoolean reclaimed = new AtomicBoolean(false); - // Current count of open scanners which reading data from this MemStoreLAB - private final AtomicInteger openScannerCount = new AtomicInteger(); + /** + * Its initial value is 1, so it is one bigger than the current count of open scanners which + * reading data from this MemStoreLAB. + */ + private final RefCnt refCnt; // Used in testing public MemStoreLABImpl() { @@ -100,6 +104,9 @@ public class MemStoreLABImpl implements MemStoreLAB { // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument(maxAlloc <= dataChunkSize, MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); + this.refCnt = RefCnt.create(() -> { + recycleChunks(); + }); // if user requested to work with MSLABs (whether on- or off-heap), then the // immutable segments are going to use CellChunkMap as their index @@ -264,14 +271,13 @@ public class MemStoreLABImpl implements MemStoreLAB { } // We could put back the chunks to pool for reusing only when there is no // opening scanner which will read their data - int count = openScannerCount.get(); - if(count == 0) { - recycleChunks(); - } + this.refCnt.release(); } - int getOpenScannerCount() { - return this.openScannerCount.get(); + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + int getRefCntValue() { + return this.refCnt.refCnt(); } /** @@ -279,7 +285,7 @@ public class MemStoreLABImpl implements MemStoreLAB { */ @Override public void incScannerCount() { - this.openScannerCount.incrementAndGet(); + this.refCnt.retain(); } /** @@ -287,10 +293,7 @@ public class MemStoreLABImpl implements MemStoreLAB { */ @Override public void decScannerCount() { - int count = this.openScannerCount.decrementAndGet(); - if (this.closed.get() && count == 0) { - recycleChunks(); - } + this.refCnt.release(); } private void recycleChunks() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 92c210480a4..f6d58aad172 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -2351,7 +2351,7 @@ public class TestHStore { MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); assertTrue(memStoreLAB.isClosed()); - assertTrue(memStoreLAB.getOpenScannerCount() == 0); + assertTrue(memStoreLAB.getRefCntValue() == 0); assertTrue(memStoreLAB.isReclaimed()); assertTrue(memStoreLAB.chunks.isEmpty()); StoreScanner storeScanner = null; @@ -2426,6 +2426,103 @@ public class TestHStore { } } + /** + * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB} + */ + @Test + public void testImmutableMemStoreLABRefCnt() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + byte[] smallValue = new byte[3]; + byte[] largeValue = new byte[9]; + final long timestamp = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue); + final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue); + final Cell smallCell2 = createCell(qf3, timestamp, seqId+1, smallValue); + final Cell largeCell2 = createCell(qf4, timestamp, seqId+1, largeValue); + final Cell smallCell3 = createCell(qf5, timestamp, seqId+2, smallValue); + final Cell largeCell3 = createCell(qf6, timestamp, seqId+2, largeValue); + + int smallCellByteSize = MutableSegment.getCellLength(smallCell1); + int largeCellByteSize = MutableSegment.getCellLength(largeCell1); + int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); + int flushByteSize = firstWriteCellByteSize - 2; + + // set CompactingMemStore.inmemoryFlushSize to flushByteSize. + conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); + conf.setBoolean(WALFactory.WAL_ENABLED, false); + + init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) + .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); + + final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore); + assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); + myCompactingMemStore.allowCompaction.set(false); + + NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); + store.add(smallCell1, memStoreSizing); + store.add(largeCell1, memStoreSizing); + store.add(smallCell2, memStoreSizing); + store.add(largeCell2, memStoreSizing); + store.add(smallCell3, memStoreSizing); + store.add(largeCell3, memStoreSizing); + VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); + assertTrue(versionedSegmentsList.getNumOfSegments() == 3); + List segments = versionedSegmentsList.getStoreSegments(); + List memStoreLABs = new ArrayList (segments.size()); + for (ImmutableSegment segment : segments) { + memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB()); + } + List scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE); + for (MemStoreLABImpl memStoreLAB : memStoreLABs) { + assertTrue(memStoreLAB.getRefCntValue() == 2); + } + + myCompactingMemStore.allowCompaction.set(true); + myCompactingMemStore.flushInMemory(); + + versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); + assertTrue(versionedSegmentsList.getNumOfSegments() == 1); + ImmutableMemStoreLAB immutableMemStoreLAB = + (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB()); + for (MemStoreLABImpl memStoreLAB : memStoreLABs) { + assertTrue(memStoreLAB.getRefCntValue() == 2); + } + + List scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE); + for (MemStoreLABImpl memStoreLAB : memStoreLABs) { + assertTrue(memStoreLAB.getRefCntValue() == 2); + } + assertTrue(immutableMemStoreLAB.getRefCntValue() == 2); + for (KeyValueScanner scanner : scanners1) { + scanner.close(); + } + for (MemStoreLABImpl memStoreLAB : memStoreLABs) { + assertTrue(memStoreLAB.getRefCntValue() == 1); + } + for (KeyValueScanner scanner : scanners2) { + scanner.close(); + } + for (MemStoreLABImpl memStoreLAB : memStoreLABs) { + assertTrue(memStoreLAB.getRefCntValue() == 1); + } + assertTrue(immutableMemStoreLAB.getRefCntValue() == 1); + flushStore(store, id++); + for (MemStoreLABImpl memStoreLAB : memStoreLABs) { + assertTrue(memStoreLAB.getRefCntValue() == 0); + } + assertTrue(immutableMemStoreLAB.getRefCntValue() == 0); + assertTrue(immutableMemStoreLAB.isClosed()); + for (MemStoreLABImpl memStoreLAB : memStoreLABs) { + assertTrue(memStoreLAB.isClosed()); + assertTrue(memStoreLAB.isReclaimed()); + assertTrue(memStoreLAB.chunks.isEmpty()); + } + } + private HStoreFile mockStoreFileWithLength(long length) { HStoreFile sf = mock(HStoreFile.class); StoreFileReader sfr = mock(StoreFileReader.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java index 79a970571f0..7ddc9b4cfaf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java @@ -160,7 +160,7 @@ public class TestStoreScannerClosure { memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB(); if (memStoreLAB != null) { // There should be no unpooled chunks - int refCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount(); + int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue(); assertTrue("The memstore should not have unpooled chunks", refCount == 0); } }