From d822ee3a7ccc4959ed5a4b85bb54ff6142aa7d6e Mon Sep 17 00:00:00 2001 From: eshcar Date: Sun, 1 Jul 2018 15:29:26 +0300 Subject: [PATCH] HBASE-20542: Better heap utilization for IMC with MSLABs --- .../hbase/regionserver/AbstractMemStore.java | 145 +++++++----- .../CellArrayImmutableSegment.java | 5 +- .../CellChunkImmutableSegment.java | 17 +- .../regionserver/CompactingMemStore.java | 217 +++++++++++------- .../regionserver/CompactionPipeline.java | 16 +- .../CompositeImmutableSegment.java | 5 +- .../hbase/regionserver/DefaultMemStore.java | 40 ++-- .../hbase/regionserver/MemStoreCompactor.java | 12 +- .../hbase/regionserver/MemStoreLABImpl.java | 6 +- .../hbase/regionserver/MemStoreSizing.java | 8 +- .../hbase/regionserver/MutableSegment.java | 25 +- .../NonThreadSafeMemStoreSizing.java | 10 +- .../regionserver/RegionServicesForStores.java | 8 - .../hadoop/hbase/regionserver/Segment.java | 72 ++++-- .../ThreadSafeMemStoreSizing.java | 4 + .../apache/hadoop/hbase/io/TestHeapSize.java | 12 + .../regionserver/TestCompactingMemStore.java | 4 +- .../TestCompactingToCellFlatMapMemStore.java | 50 ++-- .../hadoop/hbase/regionserver/TestHStore.java | 13 +- 19 files changed, 419 insertions(+), 250 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index b82afba93d2..a7a1af88ff1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -47,7 +47,7 @@ public abstract class AbstractMemStore implements MemStore { private final CellComparator comparator; // active segment absorbs write operations - protected volatile MutableSegment active; + private volatile MutableSegment active; // Snapshot of memstore. Made for flusher. protected volatile ImmutableSegment snapshot; protected volatile long snapshotId; @@ -82,8 +82,8 @@ public abstract class AbstractMemStore implements MemStore { protected void resetActive() { // Reset heap to not include any keys - this.active = SegmentFactory.instance().createMutableSegment(conf, comparator); - this.timeOfOldestEdit = Long.MAX_VALUE; + active = SegmentFactory.instance().createMutableSegment(conf, comparator); + timeOfOldestEdit = Long.MAX_VALUE; } /** @@ -102,12 +102,52 @@ public abstract class AbstractMemStore implements MemStore { @Override public void add(Cell cell, MemStoreSizing memstoreSizing) { - Cell toAdd = maybeCloneWithAllocator(cell, false); + doAddOrUpsert(cell, 0, memstoreSizing, true); } + + /* + * Inserts the specified Cell into MemStore and deletes any existing + * versions of the same row/family/qualifier as the specified Cell. + *

+ * First, the specified Cell is inserted into the Memstore. + *

+ * If there are any existing Cell in this MemStore with the same row, + * family, and qualifier, they are removed. + *

+ * Callers must hold the read lock. + * + * @param cell the cell to be updated + * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param memstoreSizing object to accumulate changed size + */ + private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) { + doAddOrUpsert(cell, readpoint, memstoreSizing, false); + } + + private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing, boolean + doAdd) { + MutableSegment currentActive; + boolean succ = false; + while (!succ) { + currentActive = getActive(); + succ = preUpdate(currentActive, cell, memstoreSizing); + if (succ) { + if(doAdd) { + doAdd(currentActive, cell, memstoreSizing); + } else { + doUpsert(currentActive, cell, readpoint, memstoreSizing); + } + postUpdate(currentActive); + } + } + } + + private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { + Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false); boolean mslabUsed = (toAdd != cell); - // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By - // default MSLAB is ON and we might have copied cell to MSLAB area. If not we must do below deep - // copy. Or else we will keep referring to the bigger chunk of memory and prevent it from - // getting GCed. + // This cell data is backed by the same byte[] where we read request in RPC(See + // HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If + // not we must do below deep copy. Or else we will keep referring to the bigger chunk of + // memory and prevent it from getting GCed. // Copy to MSLAB would not have happened if // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled" // 2. When the size of the cell is bigger than the max size supported by MSLAB. See @@ -116,9 +156,42 @@ public abstract class AbstractMemStore implements MemStore { if (!mslabUsed) { toAdd = deepCopyIfNeeded(toAdd); } - internalAdd(toAdd, mslabUsed, memstoreSizing); + internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing); } + private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint, MemStoreSizing + memstoreSizing) { + // Add the Cell to the MemStore + // Use the internalAdd method here since we (a) already have a lock + // and (b) cannot safely use the MSLAB here without potentially + // hitting OOME - see TestMemStore.testUpsertMSLAB for a + // test that triggers the pathological case if we don't avoid MSLAB + // here. + // This cell data is backed by the same byte[] where we read request in RPC(See + // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger + // chunk of memory and prevent it from getting GCed. + cell = deepCopyIfNeeded(cell); + boolean sizeAddedPreOperation = sizeAddedPreOperation(); + currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation); + setOldestEditTimeToNow(); + } + + /** + * Issue any synchronization and test needed before applying the update + * @param currentActive the segment to be updated + * @param cell the cell to be added + * @param memstoreSizing object to accumulate region size changes + * @return true iff can proceed with applying the update + */ + protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell, + MemStoreSizing memstoreSizing); + + /** + * Issue any post update synchronization and tests + * @param currentActive updated segment + */ + protected abstract void postUpdate(MutableSegment currentActive); + private static Cell deepCopyIfNeeded(Cell cell) { if (cell instanceof ExtendedCell) { return ((ExtendedCell) cell).deepClone(); @@ -188,42 +261,11 @@ public abstract class AbstractMemStore implements MemStore { } protected void dump(Logger log) { - active.dump(log); + getActive().dump(log); snapshot.dump(log); } - /* - * Inserts the specified Cell into MemStore and deletes any existing - * versions of the same row/family/qualifier as the specified Cell. - *

- * First, the specified Cell is inserted into the Memstore. - *

- * If there are any existing Cell in this MemStore with the same row, - * family, and qualifier, they are removed. - *

- * Callers must hold the read lock. - * - * @param cell the cell to be updated - * @param readpoint readpoint below which we can safely remove duplicate KVs - * @param memstoreSize - */ - private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) { - // Add the Cell to the MemStore - // Use the internalAdd method here since we (a) already have a lock - // and (b) cannot safely use the MSLAB here without potentially - // hitting OOME - see TestMemStore.testUpsertMSLAB for a - // test that triggers the pathological case if we don't avoid MSLAB - // here. - // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). We - // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and - // prevent it from getting GCed. - cell = deepCopyIfNeeded(cell); - this.active.upsert(cell, readpoint, memstoreSizing); - setOldestEditTimeToNow(); - checkActiveSize(); - } - /* * @param a * @param b @@ -275,8 +317,9 @@ public abstract class AbstractMemStore implements MemStore { * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap. * @return either the given cell or its clone */ - private Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) { - return active.maybeCloneWithAllocator(cell, forceCloneOfBigCell); + private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell, boolean + forceCloneOfBigCell) { + return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell); } /* @@ -286,14 +329,17 @@ public abstract class AbstractMemStore implements MemStore { * Callers should ensure they already have the read lock taken * @param toAdd the cell to add * @param mslabUsed whether using MSLAB - * @param memstoreSize + * @param memstoreSizing object to accumulate changed size */ - private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) { - active.add(toAdd, mslabUsed, memstoreSizing); + private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean + mslabUsed, MemStoreSizing memstoreSizing) { + boolean sizeAddedPreOperation = sizeAddedPreOperation(); + currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation); setOldestEditTimeToNow(); - checkActiveSize(); } + protected abstract boolean sizeAddedPreOperation(); + private void setOldestEditTimeToNow() { if (timeOfOldestEdit == Long.MAX_VALUE) { timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); @@ -325,11 +371,6 @@ public abstract class AbstractMemStore implements MemStore { return snapshot; } - /** - * Check whether anything need to be done based on the current active set size - */ - protected abstract void checkActiveSize(); - /** * @return an ordered list of segments from most recent to oldest in memstore */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java index dadfc48633a..2f025554242 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java @@ -55,11 +55,12 @@ public class CellArrayImmutableSegment extends ImmutableSegment { * of CSLMImmutableSegment * The given iterator returns the Cells that "survived" the compaction. */ - protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing, + protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing mss, MemStoreCompactionStrategy.Action action) { super(segment); // initiailize the upper class long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; incMemStoreSize(0, indexOverhead, 0); // CAM is always on-heap + mss.incMemStoreSize(0, indexOverhead, 0); int numOfCells = segment.getCellsCount(); // build the new CellSet based on CellChunkMap and update the CellSet of this Segment reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(), @@ -68,7 +69,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment { // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); incMemStoreSize(0, newSegmentSizeDelta, 0); - memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0); + mss.incMemStoreSize(0, newSegmentSizeDelta, 0); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java index e2f82051c75..eed97faf372 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java @@ -80,13 +80,16 @@ public class CellChunkImmutableSegment extends ImmutableSegment { // initiate the heapSize with the size of the segment metadata if(onHeap) { incMemStoreSize(0, indexOverhead, 0); + memstoreSizing.incMemStoreSize(0, indexOverhead, 0); } else { incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM); + memstoreSizing.incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, + DEEP_OVERHEAD_CCM); } int numOfCells = segment.getCellsCount(); // build the new CellSet based on CellChunkMap reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(), - action); + memstoreSizing, action); // arrange the meta-data size, decrease all meta-data sizes related to SkipList; // add sizes of CellChunkMap entry, decrease also Cell object sizes // (reinitializeCellSet doesn't take the care for the sizes) @@ -150,7 +153,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { // CellChunkMap assumes all cells are allocated on MSLAB. // Therefore, cells which are not allocated on MSLAB initially, // are copied into MSLAB here. - c = copyCellIntoMSLAB(c); + c = copyCellIntoMSLAB(c, null); //no memstore sizing object to update alreadyCopied = true; } if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunks[currentChunkIdx].size) { @@ -197,7 +200,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { // This is a service for not-flat immutable segments private void reinitializeCellSet( int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet, - MemStoreCompactionStrategy.Action action) { + MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) { Cell curCell; Chunk[] chunks = allocIndexChunks(numOfCells); @@ -213,7 +216,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { // CellChunkMap assumes all cells are allocated on MSLAB. // Therefore, cells which are not allocated on MSLAB initially, // are copied into MSLAB here. - curCell = copyCellIntoMSLAB(curCell); + curCell = copyCellIntoMSLAB(curCell, memstoreSizing); } if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunks[currentChunkIdx].size) { // continue to the next metadata chunk @@ -315,7 +318,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { return chunks; } - private Cell copyCellIntoMSLAB(Cell cell) { + private Cell copyCellIntoMSLAB(Cell cell, MemStoreSizing memstoreSizing) { // Take care for a special case when a cell is copied from on-heap to (probably off-heap) MSLAB. // The cell allocated as an on-heap JVM object (byte array) occupies slightly different // amount of memory, than when the cell serialized and allocated on the MSLAB. @@ -332,8 +335,10 @@ public class CellChunkImmutableSegment extends ImmutableSegment { long newCellSize = getCellLength(cell); long heapOverhead = newHeapSize - oldHeapSize; long offHeapOverhead = newOffHeapSize - oldOffHeapSize; - //TODO: maybe need to update the dataSize of the region incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead); + if(memstoreSizing != null) { + memstoreSizing.incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead); + } return cell; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 3886e7d9282..157441de6f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -62,7 +62,7 @@ public class CompactingMemStore extends AbstractMemStore { // Default fraction of in-memory-flush size w.r.t. flush-to-disk size public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; - private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014; + private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1; private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); private HStore store; @@ -71,7 +71,7 @@ public class CompactingMemStore extends AbstractMemStore { protected MemStoreCompactor compactor; private long inmemoryFlushSize; // the threshold on active size for in-memory flush - private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); + private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false); // inWalReplay is true while we are synchronously replaying the edits from WAL private boolean inWalReplay = false; @@ -94,11 +94,11 @@ public class CompactingMemStore extends AbstractMemStore { public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD + 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, - // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction, - // indexType + // MemStoreCompactor, inMemoryCompactionInProgress, + // allowCompaction, indexType + Bytes.SIZEOF_LONG // inmemoryFlushSize + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay - + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction + + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD); public CompactingMemStore(Configuration conf, CellComparator c, @@ -139,12 +139,15 @@ public class CompactingMemStore extends AbstractMemStore { // Family number might also be zero in some of our unit test case numStores = 1; } - inmemoryFlushSize = memstoreFlushSize / numStores; - // multiply by a factor (the same factor for all index types) - factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, - IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT); - - inmemoryFlushSize = (long) (inmemoryFlushSize * factor); + factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0); + if(factor != 0.0) { + // multiply by a factor (the same factor for all index types) + inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores; + } else { + inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER * + conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); + inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER; + } } /** @@ -156,7 +159,7 @@ public class CompactingMemStore extends AbstractMemStore { @Override public MemStoreSize size() { MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); - memstoreSizing.incMemStoreSize(active.getMemStoreSize()); + memstoreSizing.incMemStoreSize(getActive().getMemStoreSize()); for (Segment item : pipeline.getSegments()) { memstoreSizing.incMemStoreSize(item.getMemStoreSize()); } @@ -201,9 +204,11 @@ public class CompactingMemStore extends AbstractMemStore { "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { LOG.debug("FLUSHING TO DISK {}, store={}", - getRegionServices().getRegionInfo().getEncodedName(), getFamilyName()); + getRegionServices().getRegionInfo().getEncodedName(), getFamilyName()); stopCompaction(); - pushActiveToPipeline(this.active); + // region level lock ensures pushing active to pipeline is done in isolation + // no concurrent update operations trying to flush the active segment + pushActiveToPipeline(getActive()); snapshotId = EnvironmentEdgeManager.currentTime(); // in both cases whatever is pushed to snapshot is cleared from the pipeline if (compositeSnapshot) { @@ -223,19 +228,22 @@ public class CompactingMemStore extends AbstractMemStore { // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed if (compositeSnapshot) { MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize()); - memStoreSizing.incMemStoreSize(this.active.getMemStoreSize()); + MutableSegment currActive = getActive(); + if(!currActive.isEmpty()) { + memStoreSizing.incMemStoreSize(currActive.getMemStoreSize()); + } mss = memStoreSizing.getMemStoreSize(); } else { mss = pipeline.getTailSize(); } } - return mss.getDataSize() > 0? mss: this.active.getMemStoreSize(); + return mss.getDataSize() > 0? mss: getActive().getMemStoreSize(); } @Override protected long keySize() { // Need to consider dataSize/keySize of all segments in pipeline and active - long keySize = this.active.getDataSize(); + long keySize = getActive().getDataSize(); for (Segment segment : this.pipeline.getSegments()) { keySize += segment.getDataSize(); } @@ -245,7 +253,7 @@ public class CompactingMemStore extends AbstractMemStore { @Override protected long heapSize() { // Need to consider heapOverhead of all segments in pipeline and active - long h = this.active.getHeapSize(); + long h = getActive().getHeapSize(); for (Segment segment : this.pipeline.getSegments()) { h += segment.getHeapSize(); } @@ -283,15 +291,43 @@ public class CompactingMemStore extends AbstractMemStore { inWalReplay = false; } + /** + * Issue any synchronization and test needed before applying the update + * For compacting memstore this means checking the update can increase the size without + * overflow + * @param currentActive the segment to be updated + * @param cell the cell to be added + * @param memstoreSizing object to accumulate region size changes + * @return true iff can proceed with applying the update + */ + @Override protected boolean preUpdate(MutableSegment currentActive, Cell cell, + MemStoreSizing memstoreSizing) { + if(currentActive.sharedLock()) { + if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) { + return true; + } + currentActive.sharedUnlock(); + } + return false; + } + + @Override protected void postUpdate(MutableSegment currentActive) { + currentActive.sharedUnlock(); + } + + @Override protected boolean sizeAddedPreOperation() { + return true; + } + // the getSegments() method is used for tests only @VisibleForTesting @Override protected List getSegments() { List pipelineList = pipeline.getSegments(); List list = new ArrayList<>(pipelineList.size() + 2); - list.add(this.active); + list.add(getActive()); list.addAll(pipelineList); - list.addAll(this.snapshot.getAllSegments()); + list.addAll(snapshot.getAllSegments()); return list; } @@ -351,7 +387,7 @@ public class CompactingMemStore extends AbstractMemStore { @Override public List getScanners(long readPt) throws IOException { - MutableSegment activeTmp = active; + MutableSegment activeTmp = getActive(); List pipelineList = pipeline.getSegments(); List snapshotList = snapshot.getAllSegments(); long numberOfSegments = 1L + pipelineList.size() + snapshotList.size(); @@ -363,56 +399,67 @@ public class CompactingMemStore extends AbstractMemStore { return list; } - @VisibleForTesting - protected List createList(int capacity) { - return new ArrayList<>(capacity); - } + @VisibleForTesting + protected List createList(int capacity) { + return new ArrayList<>(capacity); + } /** * Check whether anything need to be done based on the current active set size. * The method is invoked upon every addition to the active set. * For CompactingMemStore, flush the active set to the read-only memory if it's * size is above threshold + * @param currActive intended segment to update + * @param cellToAdd cell to be added to the segment + * @param memstoreSizing object to accumulate changed size + * @return true if the cell can be added to the */ - @Override - protected void checkActiveSize() { - if (shouldFlushInMemory()) { - /* The thread is dispatched to flush-in-memory. This cannot be done - * on the same thread, because for flush-in-memory we require updatesLock - * in exclusive mode while this method (checkActiveSize) is invoked holding updatesLock - * in the shared mode. */ - InMemoryFlushRunnable runnable = new InMemoryFlushRunnable(); - if (LOG.isTraceEnabled()) { - LOG.trace( - "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName()); + private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, + MemStoreSizing memstoreSizing) { + if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) { + if (currActive.setInMemoryFlushed()) { + flushInMemory(currActive); + if (inMemoryCompactionInProgress.compareAndSet(false, true)) { + // The thread is dispatched to do in-memory compaction in the background + InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable(); + if (LOG.isTraceEnabled()) { + LOG.trace("Dispatching the MemStore in-memory flush for store " + store + .getColumnFamilyName()); + } + getPool().execute(runnable); + } } - getPool().execute(runnable); + return false; } - } + return true; + } - // internally used method, externally visible only for tests + // externally visible only for tests // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, // otherwise there is a deadlock @VisibleForTesting - void flushInMemory() throws IOException { - // setting the inMemoryFlushInProgress flag again for the case this method is invoked - // directly (only in tests) in the common path setting from true to true is idempotent - inMemoryFlushInProgress.set(true); - try { - // Phase I: Update the pipeline - getRegionServices().blockUpdates(); - try { - LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); - pushActiveToPipeline(this.active); - } finally { - getRegionServices().unblockUpdates(); - } + void flushInMemory() { + MutableSegment currActive = getActive(); + if(currActive.setInMemoryFlushed()) { + flushInMemory(currActive); + } + inMemoryCompaction(); + } + private void flushInMemory(MutableSegment currActive) { + LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); + pushActiveToPipeline(currActive); + } + + void inMemoryCompaction() { + // setting the inMemoryCompactionInProgress flag again for the case this method is invoked + // directly (only in tests) in the common path setting from true to true is idempotent + inMemoryCompactionInProgress.set(true); + try { // Used by tests if (!allowCompaction.get()) { return; } - // Phase II: Compact the pipeline try { // Speculative compaction execution, may be interrupted if flush is forced while // compaction is in progress @@ -422,8 +469,7 @@ public class CompactingMemStore extends AbstractMemStore { getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e); } } finally { - inMemoryFlushInProgress.set(false); - LOG.trace("IN-MEMORY FLUSH: end"); + inMemoryCompactionInProgress.set(false); } } @@ -442,16 +488,24 @@ public class CompactingMemStore extends AbstractMemStore { } @VisibleForTesting - protected boolean shouldFlushInMemory() { - if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold - if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush - return false; // regardless the size + protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd, + MemStoreSizing memstoreSizing) { + long cellSize = currActive.getCellLength(cellToAdd); + long segmentDataSize = currActive.getDataSize(); + while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) { + // when replaying edits from WAL there is no need in in-memory flush regardless the size + // otherwise size below flush threshold try to update atomically + if(currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) { + if(memstoreSizing != null){ + memstoreSizing.incMemStoreSize(cellSize, 0, 0); + } + //enough space for cell - no need to flush + return false; } - // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude - // the insert of the active into the compaction pipeline - return (inMemoryFlushInProgress.compareAndSet(false,true)); + segmentDataSize = currActive.getDataSize(); } - return false; + // size above flush threshold + return true; } /** @@ -460,14 +514,14 @@ public class CompactingMemStore extends AbstractMemStore { * Non-blocking request */ private void stopCompaction() { - if (inMemoryFlushInProgress.get()) { + if (inMemoryCompactionInProgress.get()) { compactor.stop(); } } - protected void pushActiveToPipeline(MutableSegment active) { - if (!active.isEmpty()) { - pipeline.pushHead(active); + protected void pushActiveToPipeline(MutableSegment currActive) { + if (!currActive.isEmpty()) { + pipeline.pushHead(currActive); resetActive(); } } @@ -518,28 +572,21 @@ public class CompactingMemStore extends AbstractMemStore { } /** - * The in-memory-flusher thread performs the flush asynchronously. - * There is at most one thread per memstore instance. - * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock - * and compacts the pipeline. - */ - private class InMemoryFlushRunnable implements Runnable { - + * The in-memory-flusher thread performs the flush asynchronously. + * There is at most one thread per memstore instance. + * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock + * and compacts the pipeline. + */ + private class InMemoryCompactionRunnable implements Runnable { @Override public void run() { - try { - flushInMemory(); - } catch (IOException e) { - LOG.warn("Unable to run memstore compaction. region " - + getRegionServices().getRegionInfo().getRegionNameAsString() - + "store: "+ getFamilyName(), e); - } + inMemoryCompaction(); } } @VisibleForTesting boolean isMemStoreFlushingInMemory() { - return inMemoryFlushInProgress.get(); + return inMemoryCompactionInProgress.get(); } /** @@ -567,10 +614,10 @@ public class CompactingMemStore extends AbstractMemStore { // debug method public void debug() { - String msg = "active size=" + this.active.getDataSize(); - msg += " in-memory flush size is "+ inmemoryFlushSize; + String msg = "active size=" + getActive().getDataSize(); msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); - msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false"); + msg += " inMemoryCompactionInProgress is "+ (inMemoryCompactionInProgress.get() ? "true" : + "false"); LOG.debug(msg); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index f8aa3ef73fd..1131c3c98d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.yetus.audience.InterfaceAudience; @@ -151,15 +150,10 @@ public class CompactionPipeline { long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize; long heapSizeDelta = suffixHeapSize - newHeapSize; region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta); - LOG.debug("Suffix data size={}, new segment data size={}, " - + "suffix heap size={}," + "new segment heap size={}" - + "suffix off heap size={}," + "new segment off heap size={}" - , suffixDataSize - , newDataSize - , suffixHeapSize - , newHeapSize - , suffixOffHeapSize - , newOffHeapSize); + LOG.debug("Suffix data size={}, new segment data size={}, " + "suffix heap size={}," + + "new segment heap size={}" + "suffix off heap size={}," + + "new segment off heap size={}", suffixDataSize, newDataSize, suffixHeapSize, + newHeapSize, suffixOffHeapSize, newOffHeapSize); } return true; } @@ -214,6 +208,7 @@ public class CompactionPipeline { int i = 0; for (ImmutableSegment s : pipeline) { if ( s.canBeFlattened() ) { + s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed // size to be updated MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing(); ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( @@ -223,7 +218,6 @@ public class CompactionPipeline { // Update the global memstore size counter upon flattening there is no change in the // data size MemStoreSize mss = newMemstoreAccounting.getMemStoreSize(); - Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!"); region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize()); } LOG.debug("Compaction pipeline segment {} flattened", s); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index dcfaf812d4b..372c660670c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -239,13 +239,14 @@ public class CompositeImmutableSegment extends ImmutableSegment { } @Override - protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) { + protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing, + boolean sizeAddedPreOperation) { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } @Override protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, - MemStoreSizing memstoreSizing) { + MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 5dcf48bb7eb..97170fbeab7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -87,9 +87,9 @@ public class DefaultMemStore extends AbstractMemStore { "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { this.snapshotId = EnvironmentEdgeManager.currentTime(); - if (!this.active.isEmpty()) { + if (!getActive().isEmpty()) { ImmutableSegment immutableSegment = SegmentFactory.instance(). - createImmutableSegment(this.active); + createImmutableSegment(getActive()); this.snapshot = immutableSegment; resetActive(); } @@ -100,17 +100,17 @@ public class DefaultMemStore extends AbstractMemStore { @Override public MemStoreSize getFlushableSize() { MemStoreSize mss = getSnapshotSize(); - return mss.getDataSize() > 0? mss: this.active.getMemStoreSize(); + return mss.getDataSize() > 0? mss: getActive().getMemStoreSize(); } @Override protected long keySize() { - return this.active.getDataSize(); + return getActive().getDataSize(); } @Override protected long heapSize() { - return this.active.getHeapSize(); + return getActive().getHeapSize(); } @Override @@ -119,7 +119,7 @@ public class DefaultMemStore extends AbstractMemStore { */ public List getScanners(long readPt) throws IOException { List list = new ArrayList<>(); - addToScanners(active, readPt, list); + addToScanners(getActive(), readPt, list); addToScanners(snapshot.getAllSegments(), readPt, list); return list; } @@ -127,8 +127,8 @@ public class DefaultMemStore extends AbstractMemStore { @Override protected List getSegments() throws IOException { List list = new ArrayList<>(2); - list.add(this.active); - list.add(this.snapshot); + list.add(getActive()); + list.add(snapshot); return list; } @@ -139,27 +139,31 @@ public class DefaultMemStore extends AbstractMemStore { */ Cell getNextRow(final Cell cell) { return getLowest( - getNextRow(cell, this.active.getCellSet()), + getNextRow(cell, this.getActive().getCellSet()), getNextRow(cell, this.snapshot.getCellSet())); } @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { } - @Override - public MemStoreSize size() { - return active.getMemStoreSize(); + @Override protected boolean preUpdate(MutableSegment currentActive, Cell cell, + MemStoreSizing memstoreSizing) { + return true; } - /** - * Check whether anything need to be done based on the current active set size - * Nothing need to be done for the DefaultMemStore - */ - @Override - protected void checkActiveSize() { + @Override protected void postUpdate(MutableSegment currentActive) { return; } + @Override protected boolean sizeAddedPreOperation() { + return false; + } + + @Override + public MemStoreSize size() { + return getActive().getMemStoreSize(); + } + @Override public long preFlushSeqIDEstimation() { return HConstants.NO_SEQNUM; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index cea85e94fbd..99737425216 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -207,10 +208,14 @@ public class MemStoreCompactor { ImmutableSegment result = null; MemStoreSegmentsIterator iterator = null; + List segments = versionedList.getStoreSegments(); + for (ImmutableSegment s : segments) { + s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed + } switch (action) { case COMPACT: - iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(), + iterator = new MemStoreCompactorSegmentsIterator(segments, compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); @@ -222,13 +227,12 @@ public class MemStoreCompactor { case MERGE: case MERGE_COUNT_UNIQUE_KEYS: iterator = - new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), + new MemStoreMergerSegmentsIterator(segments, compactingMemStore.getComparator(), compactionKVMax); result = SegmentFactory.instance().createImmutableSegmentByMerge( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - versionedList.getNumOfCells(), versionedList.getStoreSegments(), - compactingMemStore.getIndexType(), action); + versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action); iterator.close(); break; default: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index ac7223f2d8d..ce775f8ff60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -120,8 +120,7 @@ public class MemStoreLABImpl implements MemStoreLAB { */ @Override public Cell forceCopyOfBigCellInto(Cell cell) { - int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize(): - KeyValueUtil.length(cell); + int size = Segment.getCellLength(cell); size += ChunkCreator.SIZEOF_CHUNK_HEADER; Preconditions.checkArgument(size >= 0, "negative size"); if (size <= dataChunkSize) { @@ -135,8 +134,7 @@ public class MemStoreLABImpl implements MemStoreLAB { } private Cell copyCellInto(Cell cell, int maxAlloc) { - int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize(): - KeyValueUtil.length(cell); + int size = Segment.getCellLength(cell); Preconditions.checkArgument(size >= 0, "negative size"); // Callers should satisfy large allocations directly from JVM since they // don't cause fragmentation as badly. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java index 8430ac6186c..22ca9b6abba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java @@ -81,6 +81,10 @@ public interface MemStoreSizing { long offHeapSizeDelta) { throw new RuntimeException("I'm a DUD, you can't use me!"); } + + @Override public boolean compareAndSetDataSize(long expected, long updated) { + throw new RuntimeException("I'm a DUD, you can't use me!"); + } }; /** @@ -104,6 +108,8 @@ public interface MemStoreSizing { return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), -delta.getOffHeapSize()); } + boolean compareAndSetDataSize(long expected, long updated); + long getDataSize(); long getHeapSize(); long getOffHeapSize(); @@ -113,4 +119,4 @@ public interface MemStoreSizing { * {@link #getHeapSize()}, and {@link #getOffHeapSize()}, in the one go. */ MemStoreSize getMemStoreSize(); -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index c72d38558fb..d76321d78a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.util.Iterator; import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -38,9 +39,13 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private public class MutableSegment extends Segment { - public final static long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD - + ClassSize.CONCURRENT_SKIPLISTMAP - + ClassSize.SYNC_TIMERANGE_TRACKER; + private final AtomicBoolean flushed = new AtomicBoolean(false); + + public final static long DEEP_OVERHEAD = ClassSize.align(Segment.DEEP_OVERHEAD + + ClassSize.CONCURRENT_SKIPLISTMAP + + ClassSize.SYNC_TIMERANGE_TRACKER + + ClassSize.REFERENCE + + ClassSize.ATOMIC_BOOLEAN); protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC)); @@ -52,12 +57,14 @@ public class MutableSegment extends Segment { * @param cell the cell to add * @param mslabUsed whether using MSLAB */ - public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing) { - internalAdd(cell, mslabUsed, memStoreSizing); + public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing, + boolean sizeAddedPreOperation) { + internalAdd(cell, mslabUsed, memStoreSizing, sizeAddedPreOperation); } - public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing) { - internalAdd(cell, false, memStoreSizing); + public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing, + boolean sizeAddedPreOperation) { + internalAdd(cell, false, memStoreSizing, sizeAddedPreOperation); // Get the Cells for the row/family/qualifier regardless of timestamp. // For this case we want to clean up any other puts @@ -105,6 +112,10 @@ public class MutableSegment extends Segment { } } + public boolean setInMemoryFlushed() { + return flushed.compareAndSet(false, true); + } + /** * Returns the first cell in the segment * @return the first cell in the segment diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java index 601ff33d58c..7b3b1d3c185 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java @@ -59,6 +59,14 @@ class NonThreadSafeMemStoreSizing implements MemStoreSizing { return this.dataSize; } + @Override public boolean compareAndSetDataSize(long expected, long updated) { + if(dataSize == expected) { + dataSize = updated; + return true; + } + return false; + } + @Override public long getDataSize() { return dataSize; @@ -78,4 +86,4 @@ class NonThreadSafeMemStoreSizing implements MemStoreSizing { public String toString() { return getMemStoreSize().toString(); } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java index b088856818d..31f2d85765d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java @@ -57,14 +57,6 @@ public class RegionServicesForStores { this.region = region; } - public void blockUpdates() { - region.blockUpdates(); - } - - public void unblockUpdates() { - region.unblockUpdates(); - } - public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 7069bf831e5..e68da1607a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -24,9 +24,11 @@ import java.util.List; import java.util.Objects; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -48,15 +50,17 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti public abstract class Segment implements MemStoreSizing { public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT - + 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, memStoreSizing, + + 6 * ClassSize.REFERENCE // cellSet, comparator, updatesLock, memStoreLAB, memStoreSizing, // and timeRangeTracker + Bytes.SIZEOF_LONG // minSequenceId + Bytes.SIZEOF_BOOLEAN); // tagsPresent public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE - + ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG; + + ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG + + ClassSize.REENTRANT_LOCK; private AtomicReference cellSet= new AtomicReference<>(); private final CellComparator comparator; + private ReentrantReadWriteLock updatesLock; protected long minSequenceId; private MemStoreLAB memStoreLAB; // Sum of sizes of all Cells added to this Segment. Cell's HeapSize is considered. This is not @@ -87,6 +91,7 @@ public abstract class Segment implements MemStoreSizing { OffHeapSize += memStoreSize.getOffHeapSize(); } this.comparator = comparator; + this.updatesLock = new ReentrantReadWriteLock(); // Do we need to be thread safe always? What if ImmutableSegment? // DITTO for the TimeRangeTracker below. this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize); @@ -97,6 +102,7 @@ public abstract class Segment implements MemStoreSizing { protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, TimeRangeTracker trt) { this.cellSet.set(cellSet); this.comparator = comparator; + this.updatesLock = new ReentrantReadWriteLock(); this.minSequenceId = Long.MAX_VALUE; this.memStoreLAB = memStoreLAB; // Do we need to be thread safe always? What if ImmutableSegment? @@ -109,9 +115,10 @@ public abstract class Segment implements MemStoreSizing { protected Segment(Segment segment) { this.cellSet.set(segment.getCellSet()); this.comparator = segment.getComparator(); + this.updatesLock = segment.getUpdatesLock(); this.minSequenceId = segment.getMinSequenceId(); this.memStoreLAB = segment.getMemStoreLAB(); - this.memStoreSizing = new ThreadSafeMemStoreSizing(segment.memStoreSizing.getMemStoreSize()); + this.memStoreSizing = segment.memStoreSizing; this.tagsPresent = segment.isTagsPresent(); this.timeRangeTracker = segment.getTimeRangeTracker(); } @@ -183,7 +190,8 @@ public abstract class Segment implements MemStoreSizing { */ @VisibleForTesting static int getCellLength(Cell cell) { - return KeyValueUtil.length(cell); + return cell instanceof ExtendedCell ? ((ExtendedCell)cell).getSerializedSize(): + KeyValueUtil.length(cell); } public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) { @@ -244,6 +252,25 @@ public abstract class Segment implements MemStoreSizing { return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead); } + public boolean sharedLock() { + return updatesLock.readLock().tryLock(); + } + + public void sharedUnlock() { + updatesLock.readLock().unlock(); + } + + public void waitForUpdates() { + if(!updatesLock.isWriteLocked()) { + updatesLock.writeLock().lock(); + } + } + + @Override + public boolean compareAndSetDataSize(long expected, long updated) { + return memStoreSizing.compareAndSetDataSize(expected, updated); + } + public long getMinSequenceId() { return minSequenceId; } @@ -288,25 +315,30 @@ public abstract class Segment implements MemStoreSizing { return comparator; } - protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) { + protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing, + boolean sizeAddedPreOperation) { boolean succ = getCellSet().add(cell); - updateMetaInfo(cell, succ, mslabUsed, memstoreSizing); + updateMetaInfo(cell, succ, mslabUsed, memstoreSizing, sizeAddedPreOperation); } protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, - MemStoreSizing memstoreSizing) { - long cellSize = 0; + MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) { + long delta = 0; + long cellSize = getCellLength(cellToAdd); // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger // than the counted number) if (succ || mslabUsed) { - cellSize = getCellLength(cellToAdd); + delta = cellSize; } - long heapSize = heapSizeChange(cellToAdd, succ); - long offHeapSize = offHeapSizeChange(cellToAdd, succ); - incMemStoreSize(cellSize, heapSize, offHeapSize); + if(sizeAddedPreOperation) { + delta -= cellSize; + } + long heapSize = heapSizeChange(cellToAdd, succ || mslabUsed); + long offHeapSize = offHeapSizeChange(cellToAdd, succ || mslabUsed); + incMemStoreSize(delta, heapSize, offHeapSize); if (memstoreSizing != null) { - memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize); + memstoreSizing.incMemStoreSize(delta, heapSize, offHeapSize); } getTimeRangeTracker().includeTimestamp(cellToAdd); minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId()); @@ -320,16 +352,16 @@ public abstract class Segment implements MemStoreSizing { } protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSizing memstoreSizing) { - updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing); + updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing, false); } /** * @return The increase in heap size because of this cell addition. This includes this cell POJO's * heap size itself and additional overhead because of addition on to CSLM. */ - protected long heapSizeChange(Cell cell, boolean succ) { + protected long heapSizeChange(Cell cell, boolean allocated) { long res = 0; - if (succ) { + if (allocated) { boolean onHeap = true; MemStoreLAB memStoreLAB = getMemStoreLAB(); if(memStoreLAB != null) { @@ -344,9 +376,9 @@ public abstract class Segment implements MemStoreSizing { return res; } - protected long offHeapSizeChange(Cell cell, boolean succ) { + protected long offHeapSizeChange(Cell cell, boolean allocated) { long res = 0; - if (succ) { + if (allocated) { boolean offHeap = false; MemStoreLAB memStoreLAB = getMemStoreLAB(); if(memStoreLAB != null) { @@ -410,4 +442,8 @@ public abstract class Segment implements MemStoreSizing { res += "max timestamp=" + timeRangeTracker.getMax(); return res; } + + private ReentrantReadWriteLock getUpdatesLock() { + return updatesLock; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java index de0549386e8..8e343d0c8db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java @@ -58,6 +58,10 @@ class ThreadSafeMemStoreSizing implements MemStoreSizing { return this.dataSize.addAndGet(dataSizeDelta); } + @Override public boolean compareAndSetDataSize(long expected, long updated) { + return dataSize.compareAndSet(expected,updated); + } + @Override public long getDataSize() { return dataSize.get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 2d454e58c1c..993503d567b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -376,11 +376,13 @@ public class TestHeapSize { expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(AtomicReference.class, false); expected += ClassSize.estimateBase(CellSet.class, false); + expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false); if (expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(CellSet.class, true); + ClassSize.estimateBase(ReentrantReadWriteLock.class,true); assertEquals(expected, actual); } @@ -391,16 +393,20 @@ public class TestHeapSize { expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(AtomicReference.class, false); expected += ClassSize.estimateBase(CellSet.class, false); + expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false); expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); + expected += ClassSize.estimateBase(AtomicBoolean.class, false); if (expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(CellSet.class, true); + ClassSize.estimateBase(ReentrantReadWriteLock.class,true); ClassSize.estimateBase(SyncTimeRangeTracker.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true); + ClassSize.estimateBase(AtomicBoolean.class,true); assertEquals(expected, actual); } @@ -411,6 +417,7 @@ public class TestHeapSize { expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(AtomicReference.class, false); expected += ClassSize.estimateBase(CellSet.class, false); + expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false); expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false); if (expected != actual) { ClassSize.estimateBase(cl, true); @@ -418,6 +425,7 @@ public class TestHeapSize { ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(CellSet.class, true); + ClassSize.estimateBase(ReentrantReadWriteLock.class,true); ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true); assertEquals(expected, actual); } @@ -428,6 +436,7 @@ public class TestHeapSize { expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(AtomicReference.class, false); expected += ClassSize.estimateBase(CellSet.class, false); + expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false); expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); if (expected != actual) { @@ -436,6 +445,7 @@ public class TestHeapSize { ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(CellSet.class, true); + ClassSize.estimateBase(ReentrantReadWriteLock.class,true); ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true); assertEquals(expected, actual); @@ -446,6 +456,7 @@ public class TestHeapSize { expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(AtomicReference.class, false); expected += ClassSize.estimateBase(CellSet.class, false); + expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false); expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false); expected += ClassSize.estimateBase(CellArrayMap.class, false); if (expected != actual) { @@ -454,6 +465,7 @@ public class TestHeapSize { ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(CellSet.class, true); + ClassSize.estimateBase(ReentrantReadWriteLock.class,true); ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true); ClassSize.estimateBase(CellArrayMap.class, true); assertEquals(expected, actual); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 8dbddb9e44c..ade8563ffb0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -838,7 +838,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { byte[] row = Bytes.toBytes(keys[i]); byte[] val = Bytes.toBytes(keys[i] + i); KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); - totalLen += kv.getLength(); + totalLen += Segment.getCellLength(kv); hmc.add(kv, null); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); } @@ -859,7 +859,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Threads.sleep(1); // to make sure each kv gets a different ts byte[] row = Bytes.toBytes(keys[i]); KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); - totalLen += kv.getLength(); + totalLen += Segment.getCellLength(kv); hmc.add(kv, null); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java index 943757b1f8f..7997a45f4b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java @@ -757,7 +757,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore // set memstore to flat into CellChunkMap MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, - String.valueOf(compactionType)); + String.valueOf(compactionType)); ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); @@ -796,11 +796,13 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode. // totalCellsLen should remain the same long oneCellOnCCMHeapSize = - ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); + (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + numOfCells * oneCellOnCCMHeapSize; - assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); + assertEquals(totalCellsLen+ChunkCreator.SIZEOF_CHUNK_HEADER, regionServicesForStores + .getMemStoreSize()); + assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); MemStoreSize mss = memstore.getFlushableSize(); @@ -824,8 +826,9 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore // but smaller than the size of two cells. // Therefore, the two created cells are flattened together. totalHeapSize = MutableSegment.DEEP_OVERHEAD - + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM - + 2 * oneCellOnCCMHeapSize; + + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + + 1 * oneCellOnCSLMHeapSize + + 1 * oneCellOnCCMHeapSize; assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); } @@ -848,6 +851,8 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; memstore.getConfiguration().setInt(MemStoreCompactionStrategy .COMPACTING_MEMSTORE_THRESHOLD_KEY, 4); + memstore.getConfiguration() + .setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.014); memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); @@ -860,39 +865,42 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore String bigVal = new String(chars); byte[] val = Bytes.toBytes(bigVal); - // We need to add two cells, five times, in order to guarantee a merge + // We need to add two cells, three times, in order to guarantee a merge List keysList = new ArrayList<>(); keysList.add(new String[]{"A", "B"}); keysList.add(new String[]{"C", "D"}); keysList.add(new String[]{"E", "F"}); keysList.add(new String[]{"G", "H"}); - keysList.add(new String[]{"I", "J"}); // Measuring the size of a single kv KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), System.currentTimeMillis(), val); long oneCellOnCCMHeapSize = - ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); - + (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); + long oneCellOnCSLMHeapSize = + ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()); long totalHeapSize = MutableSegment.DEEP_OVERHEAD; - for (int i = 0; i < 5; i++) { + for (int i = 0; i < keysList.size(); i++) { addRowsByKeys(memstore, keysList.get(i), val); while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { Threads.sleep(10); } - // The in-memory flush size is bigger than the size of a single cell, - // but smaller than the size of two cells. - // Therefore, the two created cells are flattened together. - totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM - + 2 * oneCellOnCCMHeapSize; - if (i == 4) { - // Four out of the five are merged into one, - // and the segment becomes immutable - totalHeapSize -= (3 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM - + MutableSegment.DEEP_OVERHEAD); + if(i==0) { + totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + + oneCellOnCCMHeapSize + oneCellOnCSLMHeapSize; + } else { + // The in-memory flush size is bigger than the size of a single cell, + // but smaller than the size of two cells. + // Therefore, the two created cells are flattened in a seperate segment. + totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize); } - assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); + if (i == 2) { + // Four out of the five segments are merged into one + totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); + totalHeapSize = ClassSize.align(totalHeapSize); + } + assertEquals("i="+i, totalHeapSize, ((CompactingMemStore) memstore).heapSize()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 6803003672a..12df8f113de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1765,15 +1765,12 @@ public class TestHStore { } @Override - protected boolean shouldFlushInMemory() { - boolean rval = super.shouldFlushInMemory(); - if (rval) { - RUNNER_COUNT.incrementAndGet(); - if (LOG.isDebugEnabled()) { - LOG.debug("runner count: " + RUNNER_COUNT.get()); - } + void inMemoryCompaction() { + RUNNER_COUNT.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("runner count: " + RUNNER_COUNT.get()); } - return rval; + super.inMemoryCompaction(); } }