HBASE-20542: Better heap utilization for IMC with MSLABs

This commit is contained in:
eshcar 2018-07-01 15:52:30 +03:00
parent 490ca0ce20
commit 7868f0ef97
19 changed files with 418 additions and 249 deletions

View File

@ -47,7 +47,7 @@ public abstract class AbstractMemStore implements MemStore {
private final CellComparator comparator;
// active segment absorbs write operations
protected volatile MutableSegment active;
private volatile MutableSegment active;
// Snapshot of memstore. Made for flusher.
protected volatile ImmutableSegment snapshot;
protected volatile long snapshotId;
@ -82,8 +82,8 @@ public abstract class AbstractMemStore implements MemStore {
protected void resetActive() {
// Reset heap to not include any keys
this.active = SegmentFactory.instance().createMutableSegment(conf, comparator);
this.timeOfOldestEdit = Long.MAX_VALUE;
active = SegmentFactory.instance().createMutableSegment(conf, comparator);
timeOfOldestEdit = Long.MAX_VALUE;
}
/**
@ -102,12 +102,52 @@ public abstract class AbstractMemStore implements MemStore {
@Override
public void add(Cell cell, MemStoreSizing memstoreSizing) {
Cell toAdd = maybeCloneWithAllocator(cell, false);
doAddOrUpsert(cell, 0, memstoreSizing, true); }
/*
* Inserts the specified Cell into MemStore and deletes any existing
* versions of the same row/family/qualifier as the specified Cell.
* <p>
* First, the specified Cell is inserted into the Memstore.
* <p>
* If there are any existing Cell in this MemStore with the same row,
* family, and qualifier, they are removed.
* <p>
* Callers must hold the read lock.
*
* @param cell the cell to be updated
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @param memstoreSizing object to accumulate changed size
*/
private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
doAddOrUpsert(cell, readpoint, memstoreSizing, false);
}
private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing, boolean
doAdd) {
MutableSegment currentActive;
boolean succ = false;
while (!succ) {
currentActive = getActive();
succ = preUpdate(currentActive, cell, memstoreSizing);
if (succ) {
if(doAdd) {
doAdd(currentActive, cell, memstoreSizing);
} else {
doUpsert(currentActive, cell, readpoint, memstoreSizing);
}
postUpdate(currentActive);
}
}
}
private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
boolean mslabUsed = (toAdd != cell);
// This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
// default MSLAB is ON and we might have copied cell to MSLAB area. If not we must do below deep
// copy. Or else we will keep referring to the bigger chunk of memory and prevent it from
// getting GCed.
// This cell data is backed by the same byte[] where we read request in RPC(See
// HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If
// not we must do below deep copy. Or else we will keep referring to the bigger chunk of
// memory and prevent it from getting GCed.
// Copy to MSLAB would not have happened if
// 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
// 2. When the size of the cell is bigger than the max size supported by MSLAB. See
@ -116,9 +156,42 @@ public abstract class AbstractMemStore implements MemStore {
if (!mslabUsed) {
toAdd = deepCopyIfNeeded(toAdd);
}
internalAdd(toAdd, mslabUsed, memstoreSizing);
internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
}
private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint, MemStoreSizing
memstoreSizing) {
// Add the Cell to the MemStore
// Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially
// hitting OOME - see TestMemStore.testUpsertMSLAB for a
// test that triggers the pathological case if we don't avoid MSLAB
// here.
// This cell data is backed by the same byte[] where we read request in RPC(See
// HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger
// chunk of memory and prevent it from getting GCed.
cell = deepCopyIfNeeded(cell);
boolean sizeAddedPreOperation = sizeAddedPreOperation();
currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation);
setOldestEditTimeToNow();
}
/**
* Issue any synchronization and test needed before applying the update
* @param currentActive the segment to be updated
* @param cell the cell to be added
* @param memstoreSizing object to accumulate region size changes
* @return true iff can proceed with applying the update
*/
protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell,
MemStoreSizing memstoreSizing);
/**
* Issue any post update synchronization and tests
* @param currentActive updated segment
*/
protected abstract void postUpdate(MutableSegment currentActive);
private static Cell deepCopyIfNeeded(Cell cell) {
if (cell instanceof ExtendedCell) {
return ((ExtendedCell) cell).deepClone();
@ -188,42 +261,11 @@ public abstract class AbstractMemStore implements MemStore {
}
protected void dump(Logger log) {
active.dump(log);
getActive().dump(log);
snapshot.dump(log);
}
/*
* Inserts the specified Cell into MemStore and deletes any existing
* versions of the same row/family/qualifier as the specified Cell.
* <p>
* First, the specified Cell is inserted into the Memstore.
* <p>
* If there are any existing Cell in this MemStore with the same row,
* family, and qualifier, they are removed.
* <p>
* Callers must hold the read lock.
*
* @param cell the cell to be updated
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @param memstoreSize
*/
private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
// Add the Cell to the MemStore
// Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially
// hitting OOME - see TestMemStore.testUpsertMSLAB for a
// test that triggers the pathological case if we don't avoid MSLAB
// here.
// This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). We
// must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
// prevent it from getting GCed.
cell = deepCopyIfNeeded(cell);
this.active.upsert(cell, readpoint, memstoreSizing);
setOldestEditTimeToNow();
checkActiveSize();
}
/*
* @param a
* @param b
@ -275,8 +317,9 @@ public abstract class AbstractMemStore implements MemStore {
* @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
* @return either the given cell or its clone
*/
private Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) {
return active.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell, boolean
forceCloneOfBigCell) {
return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
}
/*
@ -286,14 +329,17 @@ public abstract class AbstractMemStore implements MemStore {
* Callers should ensure they already have the read lock taken
* @param toAdd the cell to add
* @param mslabUsed whether using MSLAB
* @param memstoreSize
* @param memstoreSizing object to accumulate changed size
*/
private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) {
active.add(toAdd, mslabUsed, memstoreSizing);
private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean
mslabUsed, MemStoreSizing memstoreSizing) {
boolean sizeAddedPreOperation = sizeAddedPreOperation();
currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);
setOldestEditTimeToNow();
checkActiveSize();
}
protected abstract boolean sizeAddedPreOperation();
private void setOldestEditTimeToNow() {
if (timeOfOldestEdit == Long.MAX_VALUE) {
timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
@ -325,11 +371,6 @@ public abstract class AbstractMemStore implements MemStore {
return snapshot;
}
/**
* Check whether anything need to be done based on the current active set size
*/
protected abstract void checkActiveSize();
/**
* @return an ordered list of segments from most recent to oldest in memstore
*/

View File

@ -55,11 +55,12 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
* of CSLMImmutableSegment
* The given iterator returns the Cells that "survived" the compaction.
*/
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing,
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing mss,
MemStoreCompactionStrategy.Action action) {
super(segment); // initiailize the upper class
long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
incMemStoreSize(0, indexOverhead, 0); // CAM is always on-heap
mss.incMemStoreSize(0, indexOverhead, 0);
int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap and update the CellSet of this Segment
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
@ -68,7 +69,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
incMemStoreSize(0, newSegmentSizeDelta, 0);
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
mss.incMemStoreSize(0, newSegmentSizeDelta, 0);
}
@Override

View File

@ -80,13 +80,16 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// initiate the heapSize with the size of the segment metadata
if(onHeap) {
incMemStoreSize(0, indexOverhead, 0);
memstoreSizing.incMemStoreSize(0, indexOverhead, 0);
} else {
incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM);
memstoreSizing.incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM,
DEEP_OVERHEAD_CCM);
}
int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
action);
memstoreSizing, action);
// arrange the meta-data size, decrease all meta-data sizes related to SkipList;
// add sizes of CellChunkMap entry, decrease also Cell object sizes
// (reinitializeCellSet doesn't take the care for the sizes)
@ -150,7 +153,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// CellChunkMap assumes all cells are allocated on MSLAB.
// Therefore, cells which are not allocated on MSLAB initially,
// are copied into MSLAB here.
c = copyCellIntoMSLAB(c);
c = copyCellIntoMSLAB(c, null); //no memstore sizing object to update
alreadyCopied = true;
}
if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunks[currentChunkIdx].size) {
@ -197,7 +200,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// This is a service for not-flat immutable segments
private void reinitializeCellSet(
int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
MemStoreCompactionStrategy.Action action) {
MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) {
Cell curCell;
Chunk[] chunks = allocIndexChunks(numOfCells);
@ -213,7 +216,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// CellChunkMap assumes all cells are allocated on MSLAB.
// Therefore, cells which are not allocated on MSLAB initially,
// are copied into MSLAB here.
curCell = copyCellIntoMSLAB(curCell);
curCell = copyCellIntoMSLAB(curCell, memstoreSizing);
}
if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunks[currentChunkIdx].size) {
// continue to the next metadata chunk
@ -315,7 +318,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
return chunks;
}
private Cell copyCellIntoMSLAB(Cell cell) {
private Cell copyCellIntoMSLAB(Cell cell, MemStoreSizing memstoreSizing) {
// Take care for a special case when a cell is copied from on-heap to (probably off-heap) MSLAB.
// The cell allocated as an on-heap JVM object (byte array) occupies slightly different
// amount of memory, than when the cell serialized and allocated on the MSLAB.
@ -332,8 +335,10 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
long newCellSize = getCellLength(cell);
long heapOverhead = newHeapSize - oldHeapSize;
long offHeapOverhead = newOffHeapSize - oldOffHeapSize;
//TODO: maybe need to update the dataSize of the region
incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
if(memstoreSizing != null) {
memstoreSizing.incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
}
return cell;
}
}

View File

@ -62,7 +62,7 @@ public class CompactingMemStore extends AbstractMemStore {
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor";
private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014;
private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1;
private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
private HStore store;
@ -71,7 +71,7 @@ public class CompactingMemStore extends AbstractMemStore {
protected MemStoreCompactor compactor;
private long inmemoryFlushSize; // the threshold on active size for in-memory flush
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false);
// inWalReplay is true while we are synchronously replaying the edits from WAL
private boolean inWalReplay = false;
@ -94,11 +94,11 @@ public class CompactingMemStore extends AbstractMemStore {
public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
+ 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
// MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
// indexType
// MemStoreCompactor, inMemoryCompactionInProgress,
// allowCompaction, indexType
+ Bytes.SIZEOF_LONG // inmemoryFlushSize
+ 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction
+ CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
public CompactingMemStore(Configuration conf, CellComparator c,
@ -139,12 +139,15 @@ public class CompactingMemStore extends AbstractMemStore {
// Family number might also be zero in some of our unit test case
numStores = 1;
}
inmemoryFlushSize = memstoreFlushSize / numStores;
// multiply by a factor (the same factor for all index types)
factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT);
inmemoryFlushSize = (long) (inmemoryFlushSize * factor);
factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0);
if(factor != 0.0) {
// multiply by a factor (the same factor for all index types)
inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores;
} else {
inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER *
conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER;
}
}
/**
@ -156,7 +159,7 @@ public class CompactingMemStore extends AbstractMemStore {
@Override
public MemStoreSize size() {
MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
memstoreSizing.incMemStoreSize(active.getMemStoreSize());
memstoreSizing.incMemStoreSize(getActive().getMemStoreSize());
for (Segment item : pipeline.getSegments()) {
memstoreSizing.incMemStoreSize(item.getMemStoreSize());
}
@ -201,9 +204,11 @@ public class CompactingMemStore extends AbstractMemStore {
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
LOG.debug("FLUSHING TO DISK {}, store={}",
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
stopCompaction();
pushActiveToPipeline(this.active);
// region level lock ensures pushing active to pipeline is done in isolation
// no concurrent update operations trying to flush the active segment
pushActiveToPipeline(getActive());
snapshotId = EnvironmentEdgeManager.currentTime();
// in both cases whatever is pushed to snapshot is cleared from the pipeline
if (compositeSnapshot) {
@ -223,19 +228,22 @@ public class CompactingMemStore extends AbstractMemStore {
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
if (compositeSnapshot) {
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
memStoreSizing.incMemStoreSize(this.active.getMemStoreSize());
MutableSegment currActive = getActive();
if(!currActive.isEmpty()) {
memStoreSizing.incMemStoreSize(currActive.getMemStoreSize());
}
mss = memStoreSizing.getMemStoreSize();
} else {
mss = pipeline.getTailSize();
}
}
return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
}
@Override
protected long keySize() {
// Need to consider dataSize/keySize of all segments in pipeline and active
long keySize = this.active.getDataSize();
long keySize = getActive().getDataSize();
for (Segment segment : this.pipeline.getSegments()) {
keySize += segment.getDataSize();
}
@ -245,7 +253,7 @@ public class CompactingMemStore extends AbstractMemStore {
@Override
protected long heapSize() {
// Need to consider heapOverhead of all segments in pipeline and active
long h = this.active.getHeapSize();
long h = getActive().getHeapSize();
for (Segment segment : this.pipeline.getSegments()) {
h += segment.getHeapSize();
}
@ -283,15 +291,43 @@ public class CompactingMemStore extends AbstractMemStore {
inWalReplay = false;
}
/**
* Issue any synchronization and test needed before applying the update
* For compacting memstore this means checking the update can increase the size without
* overflow
* @param currentActive the segment to be updated
* @param cell the cell to be added
* @param memstoreSizing object to accumulate region size changes
* @return true iff can proceed with applying the update
*/
@Override protected boolean preUpdate(MutableSegment currentActive, Cell cell,
MemStoreSizing memstoreSizing) {
if(currentActive.sharedLock()) {
if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) {
return true;
}
currentActive.sharedUnlock();
}
return false;
}
@Override protected void postUpdate(MutableSegment currentActive) {
currentActive.sharedUnlock();
}
@Override protected boolean sizeAddedPreOperation() {
return true;
}
// the getSegments() method is used for tests only
@VisibleForTesting
@Override
protected List<Segment> getSegments() {
List<? extends Segment> pipelineList = pipeline.getSegments();
List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
list.add(this.active);
list.add(getActive());
list.addAll(pipelineList);
list.addAll(this.snapshot.getAllSegments());
list.addAll(snapshot.getAllSegments());
return list;
}
@ -351,7 +387,7 @@ public class CompactingMemStore extends AbstractMemStore {
@Override
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
MutableSegment activeTmp = active;
MutableSegment activeTmp = getActive();
List<? extends Segment> pipelineList = pipeline.getSegments();
List<? extends Segment> snapshotList = snapshot.getAllSegments();
long numberOfSegments = 1L + pipelineList.size() + snapshotList.size();
@ -363,56 +399,67 @@ public class CompactingMemStore extends AbstractMemStore {
return list;
}
@VisibleForTesting
protected List<KeyValueScanner> createList(int capacity) {
return new ArrayList<>(capacity);
}
@VisibleForTesting
protected List<KeyValueScanner> createList(int capacity) {
return new ArrayList<>(capacity);
}
/**
* Check whether anything need to be done based on the current active set size.
* The method is invoked upon every addition to the active set.
* For CompactingMemStore, flush the active set to the read-only memory if it's
* size is above threshold
* @param currActive intended segment to update
* @param cellToAdd cell to be added to the segment
* @param memstoreSizing object to accumulate changed size
* @return true if the cell can be added to the
*/
@Override
protected void checkActiveSize() {
if (shouldFlushInMemory()) {
/* The thread is dispatched to flush-in-memory. This cannot be done
* on the same thread, because for flush-in-memory we require updatesLock
* in exclusive mode while this method (checkActiveSize) is invoked holding updatesLock
* in the shared mode. */
InMemoryFlushRunnable runnable = new InMemoryFlushRunnable();
if (LOG.isTraceEnabled()) {
LOG.trace(
"Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
if (currActive.setInMemoryFlushed()) {
flushInMemory(currActive);
if (inMemoryCompactionInProgress.compareAndSet(false, true)) {
// The thread is dispatched to do in-memory compaction in the background
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
if (LOG.isTraceEnabled()) {
LOG.trace("Dispatching the MemStore in-memory flush for store " + store
.getColumnFamilyName());
}
getPool().execute(runnable);
}
}
getPool().execute(runnable);
return false;
}
return true;
}
// internally used method, externally visible only for tests
// externally visible only for tests
// when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
// otherwise there is a deadlock
@VisibleForTesting
void flushInMemory() throws IOException {
// setting the inMemoryFlushInProgress flag again for the case this method is invoked
// directly (only in tests) in the common path setting from true to true is idempotent
inMemoryFlushInProgress.set(true);
try {
// Phase I: Update the pipeline
getRegionServices().blockUpdates();
try {
LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
pushActiveToPipeline(this.active);
} finally {
getRegionServices().unblockUpdates();
}
void flushInMemory() {
MutableSegment currActive = getActive();
if(currActive.setInMemoryFlushed()) {
flushInMemory(currActive);
}
inMemoryCompaction();
}
private void flushInMemory(MutableSegment currActive) {
LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
pushActiveToPipeline(currActive);
}
void inMemoryCompaction() {
// setting the inMemoryCompactionInProgress flag again for the case this method is invoked
// directly (only in tests) in the common path setting from true to true is idempotent
inMemoryCompactionInProgress.set(true);
try {
// Used by tests
if (!allowCompaction.get()) {
return;
}
// Phase II: Compact the pipeline
try {
// Speculative compaction execution, may be interrupted if flush is forced while
// compaction is in progress
@ -422,8 +469,7 @@ public class CompactingMemStore extends AbstractMemStore {
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
}
} finally {
inMemoryFlushInProgress.set(false);
LOG.trace("IN-MEMORY FLUSH: end");
inMemoryCompactionInProgress.set(false);
}
}
@ -442,16 +488,24 @@ public class CompactingMemStore extends AbstractMemStore {
}
@VisibleForTesting
protected boolean shouldFlushInMemory() {
if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold
if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush
return false; // regardless the size
protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
long cellSize = currActive.getCellLength(cellToAdd);
long segmentDataSize = currActive.getDataSize();
while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
// when replaying edits from WAL there is no need in in-memory flush regardless the size
// otherwise size below flush threshold try to update atomically
if(currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
if(memstoreSizing != null){
memstoreSizing.incMemStoreSize(cellSize, 0, 0);
}
//enough space for cell - no need to flush
return false;
}
// the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
// the insert of the active into the compaction pipeline
return (inMemoryFlushInProgress.compareAndSet(false,true));
segmentDataSize = currActive.getDataSize();
}
return false;
// size above flush threshold
return true;
}
/**
@ -460,14 +514,14 @@ public class CompactingMemStore extends AbstractMemStore {
* Non-blocking request
*/
private void stopCompaction() {
if (inMemoryFlushInProgress.get()) {
if (inMemoryCompactionInProgress.get()) {
compactor.stop();
}
}
protected void pushActiveToPipeline(MutableSegment active) {
if (!active.isEmpty()) {
pipeline.pushHead(active);
protected void pushActiveToPipeline(MutableSegment currActive) {
if (!currActive.isEmpty()) {
pipeline.pushHead(currActive);
resetActive();
}
}
@ -518,28 +572,21 @@ public class CompactingMemStore extends AbstractMemStore {
}
/**
* The in-memory-flusher thread performs the flush asynchronously.
* There is at most one thread per memstore instance.
* It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
* and compacts the pipeline.
*/
private class InMemoryFlushRunnable implements Runnable {
* The in-memory-flusher thread performs the flush asynchronously.
* There is at most one thread per memstore instance.
* It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
* and compacts the pipeline.
*/
private class InMemoryCompactionRunnable implements Runnable {
@Override
public void run() {
try {
flushInMemory();
} catch (IOException e) {
LOG.warn("Unable to run memstore compaction. region "
+ getRegionServices().getRegionInfo().getRegionNameAsString()
+ "store: "+ getFamilyName(), e);
}
inMemoryCompaction();
}
}
@VisibleForTesting
boolean isMemStoreFlushingInMemory() {
return inMemoryFlushInProgress.get();
return inMemoryCompactionInProgress.get();
}
/**
@ -567,10 +614,10 @@ public class CompactingMemStore extends AbstractMemStore {
// debug method
public void debug() {
String msg = "active size=" + this.active.getDataSize();
msg += " in-memory flush size is "+ inmemoryFlushSize;
String msg = "active size=" + getActive().getDataSize();
msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");
msg += " inMemoryCompactionInProgress is "+ (inMemoryCompactionInProgress.get() ? "true" :
"false");
LOG.debug(msg);
}

View File

@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
@ -151,15 +150,10 @@ public class CompactionPipeline {
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
long heapSizeDelta = suffixHeapSize - newHeapSize;
region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
LOG.debug("Suffix data size={}, new segment data size={}, "
+ "suffix heap size={}," + "new segment heap size={}"
+ "suffix off heap size={}," + "new segment off heap size={}"
, suffixDataSize
, newDataSize
, suffixHeapSize
, newHeapSize
, suffixOffHeapSize
, newOffHeapSize);
LOG.debug("Suffix data size={}, new segment data size={}, " + "suffix heap size={},"
+ "new segment heap size={}" + "suffix off heap size={},"
+ "new segment off heap size={}", suffixDataSize, newDataSize, suffixHeapSize,
newHeapSize, suffixOffHeapSize, newOffHeapSize);
}
return true;
}
@ -214,6 +208,7 @@ public class CompactionPipeline {
int i = 0;
for (ImmutableSegment s : pipeline) {
if ( s.canBeFlattened() ) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
// size to be updated
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
@ -223,7 +218,6 @@ public class CompactionPipeline {
// Update the global memstore size counter upon flattening there is no change in the
// data size
MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!");
region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
}
LOG.debug("Compaction pipeline segment {} flattened", s);

View File

@ -239,13 +239,14 @@ public class CompositeImmutableSegment extends ImmutableSegment {
}
@Override
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) {
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing,
boolean sizeAddedPreOperation) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
@Override
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
MemStoreSizing memstoreSizing) {
MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}

View File

@ -87,9 +87,9 @@ public class DefaultMemStore extends AbstractMemStore {
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
this.snapshotId = EnvironmentEdgeManager.currentTime();
if (!this.active.isEmpty()) {
if (!getActive().isEmpty()) {
ImmutableSegment immutableSegment = SegmentFactory.instance().
createImmutableSegment(this.active);
createImmutableSegment(getActive());
this.snapshot = immutableSegment;
resetActive();
}
@ -100,17 +100,17 @@ public class DefaultMemStore extends AbstractMemStore {
@Override
public MemStoreSize getFlushableSize() {
MemStoreSize mss = getSnapshotSize();
return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
}
@Override
protected long keySize() {
return this.active.getDataSize();
return getActive().getDataSize();
}
@Override
protected long heapSize() {
return this.active.getHeapSize();
return getActive().getHeapSize();
}
@Override
@ -119,7 +119,7 @@ public class DefaultMemStore extends AbstractMemStore {
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<KeyValueScanner> list = new ArrayList<>();
addToScanners(active, readPt, list);
addToScanners(getActive(), readPt, list);
addToScanners(snapshot.getAllSegments(), readPt, list);
return list;
}
@ -127,8 +127,8 @@ public class DefaultMemStore extends AbstractMemStore {
@Override
protected List<Segment> getSegments() throws IOException {
List<Segment> list = new ArrayList<>(2);
list.add(this.active);
list.add(this.snapshot);
list.add(getActive());
list.add(snapshot);
return list;
}
@ -139,27 +139,31 @@ public class DefaultMemStore extends AbstractMemStore {
*/
Cell getNextRow(final Cell cell) {
return getLowest(
getNextRow(cell, this.active.getCellSet()),
getNextRow(cell, this.getActive().getCellSet()),
getNextRow(cell, this.snapshot.getCellSet()));
}
@Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
}
@Override
public MemStoreSize size() {
return active.getMemStoreSize();
@Override protected boolean preUpdate(MutableSegment currentActive, Cell cell,
MemStoreSizing memstoreSizing) {
return true;
}
/**
* Check whether anything need to be done based on the current active set size
* Nothing need to be done for the DefaultMemStore
*/
@Override
protected void checkActiveSize() {
@Override protected void postUpdate(MutableSegment currentActive) {
return;
}
@Override protected boolean sizeAddedPreOperation() {
return false;
}
@Override
public MemStoreSize size() {
return getActive().getMemStoreSize();
}
@Override
public long preFlushSeqIDEstimation() {
return HConstants.NO_SEQNUM;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
@ -207,10 +208,14 @@ public class MemStoreCompactor {
ImmutableSegment result = null;
MemStoreSegmentsIterator iterator = null;
List<ImmutableSegment> segments = versionedList.getStoreSegments();
for (ImmutableSegment s : segments) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
}
switch (action) {
case COMPACT:
iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
iterator = new MemStoreCompactorSegmentsIterator(segments,
compactingMemStore.getComparator(),
compactionKVMax, compactingMemStore.getStore());
@ -222,13 +227,12 @@ public class MemStoreCompactor {
case MERGE:
case MERGE_COUNT_UNIQUE_KEYS:
iterator =
new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
new MemStoreMergerSegmentsIterator(segments,
compactingMemStore.getComparator(), compactionKVMax);
result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), versionedList.getStoreSegments(),
compactingMemStore.getIndexType(), action);
versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action);
iterator.close();
break;
default:

View File

@ -120,8 +120,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
*/
@Override
public Cell forceCopyOfBigCellInto(Cell cell) {
int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
KeyValueUtil.length(cell);
int size = Segment.getCellLength(cell);
size += ChunkCreator.SIZEOF_CHUNK_HEADER;
Preconditions.checkArgument(size >= 0, "negative size");
if (size <= dataChunkSize) {
@ -135,8 +134,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
}
private Cell copyCellInto(Cell cell, int maxAlloc) {
int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
KeyValueUtil.length(cell);
int size = Segment.getCellLength(cell);
Preconditions.checkArgument(size >= 0, "negative size");
// Callers should satisfy large allocations directly from JVM since they
// don't cause fragmentation as badly.

View File

@ -81,6 +81,10 @@ public interface MemStoreSizing {
long offHeapSizeDelta) {
throw new RuntimeException("I'm a DUD, you can't use me!");
}
@Override public boolean compareAndSetDataSize(long expected, long updated) {
throw new RuntimeException("I'm a DUD, you can't use me!");
}
};
/**
@ -104,6 +108,8 @@ public interface MemStoreSizing {
return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), -delta.getOffHeapSize());
}
boolean compareAndSetDataSize(long expected, long updated);
long getDataSize();
long getHeapSize();
long getOffHeapSize();
@ -113,4 +119,4 @@ public interface MemStoreSizing {
* {@link #getHeapSize()}, and {@link #getOffHeapSize()}, in the one go.
*/
MemStoreSize getMemStoreSize();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@ -38,9 +39,13 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
@InterfaceAudience.Private
public class MutableSegment extends Segment {
public final static long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
+ ClassSize.CONCURRENT_SKIPLISTMAP
+ ClassSize.SYNC_TIMERANGE_TRACKER;
private final AtomicBoolean flushed = new AtomicBoolean(false);
public final static long DEEP_OVERHEAD = ClassSize.align(Segment.DEEP_OVERHEAD
+ ClassSize.CONCURRENT_SKIPLISTMAP
+ ClassSize.SYNC_TIMERANGE_TRACKER
+ ClassSize.REFERENCE
+ ClassSize.ATOMIC_BOOLEAN);
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
@ -52,12 +57,14 @@ public class MutableSegment extends Segment {
* @param cell the cell to add
* @param mslabUsed whether using MSLAB
*/
public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing) {
internalAdd(cell, mslabUsed, memStoreSizing);
public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing,
boolean sizeAddedPreOperation) {
internalAdd(cell, mslabUsed, memStoreSizing, sizeAddedPreOperation);
}
public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing) {
internalAdd(cell, false, memStoreSizing);
public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing,
boolean sizeAddedPreOperation) {
internalAdd(cell, false, memStoreSizing, sizeAddedPreOperation);
// Get the Cells for the row/family/qualifier regardless of timestamp.
// For this case we want to clean up any other puts
@ -105,6 +112,10 @@ public class MutableSegment extends Segment {
}
}
public boolean setInMemoryFlushed() {
return flushed.compareAndSet(false, true);
}
/**
* Returns the first cell in the segment
* @return the first cell in the segment

View File

@ -59,6 +59,14 @@ class NonThreadSafeMemStoreSizing implements MemStoreSizing {
return this.dataSize;
}
@Override public boolean compareAndSetDataSize(long expected, long updated) {
if(dataSize == expected) {
dataSize = updated;
return true;
}
return false;
}
@Override
public long getDataSize() {
return dataSize;
@ -78,4 +86,4 @@ class NonThreadSafeMemStoreSizing implements MemStoreSizing {
public String toString() {
return getMemStoreSize().toString();
}
}
}

View File

@ -57,14 +57,6 @@ public class RegionServicesForStores {
this.region = region;
}
public void blockUpdates() {
region.blockUpdates();
}
public void unblockUpdates() {
region.unblockUpdates();
}
public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
}

View File

@ -24,9 +24,11 @@ import java.util.List;
import java.util.Objects;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
@ -48,15 +50,17 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
public abstract class Segment implements MemStoreSizing {
public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, memStoreSizing,
+ 6 * ClassSize.REFERENCE // cellSet, comparator, updatesLock, memStoreLAB, memStoreSizing,
// and timeRangeTracker
+ Bytes.SIZEOF_LONG // minSequenceId
+ Bytes.SIZEOF_BOOLEAN); // tagsPresent
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE
+ ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG;
+ ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG
+ ClassSize.REENTRANT_LOCK;
private AtomicReference<CellSet> cellSet= new AtomicReference<>();
private final CellComparator comparator;
private ReentrantReadWriteLock updatesLock;
protected long minSequenceId;
private MemStoreLAB memStoreLAB;
// Sum of sizes of all Cells added to this Segment. Cell's HeapSize is considered. This is not
@ -87,6 +91,7 @@ public abstract class Segment implements MemStoreSizing {
OffHeapSize += memStoreSize.getOffHeapSize();
}
this.comparator = comparator;
this.updatesLock = new ReentrantReadWriteLock();
// Do we need to be thread safe always? What if ImmutableSegment?
// DITTO for the TimeRangeTracker below.
this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize);
@ -97,6 +102,7 @@ public abstract class Segment implements MemStoreSizing {
protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, TimeRangeTracker trt) {
this.cellSet.set(cellSet);
this.comparator = comparator;
this.updatesLock = new ReentrantReadWriteLock();
this.minSequenceId = Long.MAX_VALUE;
this.memStoreLAB = memStoreLAB;
// Do we need to be thread safe always? What if ImmutableSegment?
@ -109,9 +115,10 @@ public abstract class Segment implements MemStoreSizing {
protected Segment(Segment segment) {
this.cellSet.set(segment.getCellSet());
this.comparator = segment.getComparator();
this.updatesLock = segment.getUpdatesLock();
this.minSequenceId = segment.getMinSequenceId();
this.memStoreLAB = segment.getMemStoreLAB();
this.memStoreSizing = new ThreadSafeMemStoreSizing(segment.memStoreSizing.getMemStoreSize());
this.memStoreSizing = segment.memStoreSizing;
this.tagsPresent = segment.isTagsPresent();
this.timeRangeTracker = segment.getTimeRangeTracker();
}
@ -183,7 +190,8 @@ public abstract class Segment implements MemStoreSizing {
*/
@VisibleForTesting
static int getCellLength(Cell cell) {
return KeyValueUtil.length(cell);
return cell instanceof ExtendedCell ? ((ExtendedCell)cell).getSerializedSize():
KeyValueUtil.length(cell);
}
public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
@ -244,6 +252,25 @@ public abstract class Segment implements MemStoreSizing {
return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
}
public boolean sharedLock() {
return updatesLock.readLock().tryLock();
}
public void sharedUnlock() {
updatesLock.readLock().unlock();
}
public void waitForUpdates() {
if(!updatesLock.isWriteLocked()) {
updatesLock.writeLock().lock();
}
}
@Override
public boolean compareAndSetDataSize(long expected, long updated) {
return memStoreSizing.compareAndSetDataSize(expected, updated);
}
public long getMinSequenceId() {
return minSequenceId;
}
@ -288,25 +315,30 @@ public abstract class Segment implements MemStoreSizing {
return comparator;
}
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) {
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing,
boolean sizeAddedPreOperation) {
boolean succ = getCellSet().add(cell);
updateMetaInfo(cell, succ, mslabUsed, memstoreSizing);
updateMetaInfo(cell, succ, mslabUsed, memstoreSizing, sizeAddedPreOperation);
}
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
MemStoreSizing memstoreSizing) {
long cellSize = 0;
MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) {
long delta = 0;
long cellSize = getCellLength(cellToAdd);
// If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
// MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
// than the counted number)
if (succ || mslabUsed) {
cellSize = getCellLength(cellToAdd);
delta = cellSize;
}
long heapSize = heapSizeChange(cellToAdd, succ);
long offHeapSize = offHeapSizeChange(cellToAdd, succ);
incMemStoreSize(cellSize, heapSize, offHeapSize);
if(sizeAddedPreOperation) {
delta -= cellSize;
}
long heapSize = heapSizeChange(cellToAdd, succ || mslabUsed);
long offHeapSize = offHeapSizeChange(cellToAdd, succ || mslabUsed);
incMemStoreSize(delta, heapSize, offHeapSize);
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize);
memstoreSizing.incMemStoreSize(delta, heapSize, offHeapSize);
}
getTimeRangeTracker().includeTimestamp(cellToAdd);
minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
@ -320,16 +352,16 @@ public abstract class Segment implements MemStoreSizing {
}
protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSizing memstoreSizing) {
updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing);
updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing, false);
}
/**
* @return The increase in heap size because of this cell addition. This includes this cell POJO's
* heap size itself and additional overhead because of addition on to CSLM.
*/
protected long heapSizeChange(Cell cell, boolean succ) {
protected long heapSizeChange(Cell cell, boolean allocated) {
long res = 0;
if (succ) {
if (allocated) {
boolean onHeap = true;
MemStoreLAB memStoreLAB = getMemStoreLAB();
if(memStoreLAB != null) {
@ -344,9 +376,9 @@ public abstract class Segment implements MemStoreSizing {
return res;
}
protected long offHeapSizeChange(Cell cell, boolean succ) {
protected long offHeapSizeChange(Cell cell, boolean allocated) {
long res = 0;
if (succ) {
if (allocated) {
boolean offHeap = false;
MemStoreLAB memStoreLAB = getMemStoreLAB();
if(memStoreLAB != null) {
@ -410,4 +442,8 @@ public abstract class Segment implements MemStoreSizing {
res += "max timestamp=" + timeRangeTracker.getMax();
return res;
}
private ReentrantReadWriteLock getUpdatesLock() {
return updatesLock;
}
}

View File

@ -58,6 +58,10 @@ class ThreadSafeMemStoreSizing implements MemStoreSizing {
return this.dataSize.addAndGet(dataSizeDelta);
}
@Override public boolean compareAndSetDataSize(long expected, long updated) {
return dataSize.compareAndSet(expected,updated);
}
@Override
public long getDataSize() {
return dataSize.get();

View File

@ -376,11 +376,13 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
assertEquals(expected, actual);
}
@ -391,16 +393,20 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
ClassSize.estimateBase(AtomicBoolean.class,true);
assertEquals(expected, actual);
}
@ -411,6 +417,7 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
@ -418,6 +425,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
assertEquals(expected, actual);
}
@ -428,6 +436,7 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
if (expected != actual) {
@ -436,6 +445,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
assertEquals(expected, actual);
@ -446,6 +456,7 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(CellArrayMap.class, false);
if (expected != actual) {
@ -454,6 +465,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
ClassSize.estimateBase(CellArrayMap.class, true);
assertEquals(expected, actual);

View File

@ -838,7 +838,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
byte[] row = Bytes.toBytes(keys[i]);
byte[] val = Bytes.toBytes(keys[i] + i);
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
totalLen += kv.getLength();
totalLen += Segment.getCellLength(kv);
hmc.add(kv, null);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
}
@ -859,7 +859,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
Threads.sleep(1); // to make sure each kv gets a different ts
byte[] row = Bytes.toBytes(keys[i]);
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
totalLen += kv.getLength();
totalLen += Segment.getCellLength(kv);
hmc.add(kv, null);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
}

View File

@ -759,7 +759,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
// set memstore to flat into CellChunkMap
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
String.valueOf(compactionType));
((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
@ -798,11 +798,13 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
(long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ numOfCells * oneCellOnCCMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalCellsLen+ChunkCreator.SIZEOF_CHUNK_HEADER, regionServicesForStores
.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
MemStoreSize mss = memstore.getFlushableSize();
@ -826,8 +828,9 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
// but smaller than the size of two cells.
// Therefore, the two created cells are flattened together.
totalHeapSize = MutableSegment.DEEP_OVERHEAD
+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ 2 * oneCellOnCCMHeapSize;
+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ 1 * oneCellOnCSLMHeapSize
+ 1 * oneCellOnCCMHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
}
@ -850,6 +853,8 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().setInt(MemStoreCompactionStrategy
.COMPACTING_MEMSTORE_THRESHOLD_KEY, 4);
memstore.getConfiguration()
.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.014);
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
@ -862,39 +867,42 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
String bigVal = new String(chars);
byte[] val = Bytes.toBytes(bigVal);
// We need to add two cells, five times, in order to guarantee a merge
// We need to add two cells, three times, in order to guarantee a merge
List<String[]> keysList = new ArrayList<>();
keysList.add(new String[]{"A", "B"});
keysList.add(new String[]{"C", "D"});
keysList.add(new String[]{"E", "F"});
keysList.add(new String[]{"G", "H"});
keysList.add(new String[]{"I", "J"});
// Measuring the size of a single kv
KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"),
Bytes.toBytes("testqualifier"), System.currentTimeMillis(), val);
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
(long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
long oneCellOnCSLMHeapSize =
ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
long totalHeapSize = MutableSegment.DEEP_OVERHEAD;
for (int i = 0; i < 5; i++) {
for (int i = 0; i < keysList.size(); i++) {
addRowsByKeys(memstore, keysList.get(i), val);
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
// The in-memory flush size is bigger than the size of a single cell,
// but smaller than the size of two cells.
// Therefore, the two created cells are flattened together.
totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ 2 * oneCellOnCCMHeapSize;
if (i == 4) {
// Four out of the five are merged into one,
// and the segment becomes immutable
totalHeapSize -= (3 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ MutableSegment.DEEP_OVERHEAD);
if(i==0) {
totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ oneCellOnCCMHeapSize + oneCellOnCSLMHeapSize;
} else {
// The in-memory flush size is bigger than the size of a single cell,
// but smaller than the size of two cells.
// Therefore, the two created cells are flattened in a seperate segment.
totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize);
}
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
if (i == 2) {
// Four out of the five segments are merged into one
totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
totalHeapSize = ClassSize.align(totalHeapSize);
}
assertEquals("i="+i, totalHeapSize, ((CompactingMemStore) memstore).heapSize());
}
}

View File

@ -1714,15 +1714,12 @@ public class TestHStore {
}
@Override
protected boolean shouldFlushInMemory() {
boolean rval = super.shouldFlushInMemory();
if (rval) {
RUNNER_COUNT.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("runner count: " + RUNNER_COUNT.get());
}
void inMemoryCompaction() {
RUNNER_COUNT.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("runner count: " + RUNNER_COUNT.get());
}
return rval;
super.inMemoryCompaction();
}
}