HBASE-20542: Better heap utilization for IMC with MSLABs
This commit is contained in:
parent
112d050609
commit
d822ee3a7c
|
@ -47,7 +47,7 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
private final CellComparator comparator;
|
||||
|
||||
// active segment absorbs write operations
|
||||
protected volatile MutableSegment active;
|
||||
private volatile MutableSegment active;
|
||||
// Snapshot of memstore. Made for flusher.
|
||||
protected volatile ImmutableSegment snapshot;
|
||||
protected volatile long snapshotId;
|
||||
|
@ -82,8 +82,8 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
|
||||
protected void resetActive() {
|
||||
// Reset heap to not include any keys
|
||||
this.active = SegmentFactory.instance().createMutableSegment(conf, comparator);
|
||||
this.timeOfOldestEdit = Long.MAX_VALUE;
|
||||
active = SegmentFactory.instance().createMutableSegment(conf, comparator);
|
||||
timeOfOldestEdit = Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -102,12 +102,52 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
|
||||
@Override
|
||||
public void add(Cell cell, MemStoreSizing memstoreSizing) {
|
||||
Cell toAdd = maybeCloneWithAllocator(cell, false);
|
||||
doAddOrUpsert(cell, 0, memstoreSizing, true); }
|
||||
|
||||
/*
|
||||
* Inserts the specified Cell into MemStore and deletes any existing
|
||||
* versions of the same row/family/qualifier as the specified Cell.
|
||||
* <p>
|
||||
* First, the specified Cell is inserted into the Memstore.
|
||||
* <p>
|
||||
* If there are any existing Cell in this MemStore with the same row,
|
||||
* family, and qualifier, they are removed.
|
||||
* <p>
|
||||
* Callers must hold the read lock.
|
||||
*
|
||||
* @param cell the cell to be updated
|
||||
* @param readpoint readpoint below which we can safely remove duplicate KVs
|
||||
* @param memstoreSizing object to accumulate changed size
|
||||
*/
|
||||
private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
|
||||
doAddOrUpsert(cell, readpoint, memstoreSizing, false);
|
||||
}
|
||||
|
||||
private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing, boolean
|
||||
doAdd) {
|
||||
MutableSegment currentActive;
|
||||
boolean succ = false;
|
||||
while (!succ) {
|
||||
currentActive = getActive();
|
||||
succ = preUpdate(currentActive, cell, memstoreSizing);
|
||||
if (succ) {
|
||||
if(doAdd) {
|
||||
doAdd(currentActive, cell, memstoreSizing);
|
||||
} else {
|
||||
doUpsert(currentActive, cell, readpoint, memstoreSizing);
|
||||
}
|
||||
postUpdate(currentActive);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
|
||||
Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
|
||||
boolean mslabUsed = (toAdd != cell);
|
||||
// This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
|
||||
// default MSLAB is ON and we might have copied cell to MSLAB area. If not we must do below deep
|
||||
// copy. Or else we will keep referring to the bigger chunk of memory and prevent it from
|
||||
// getting GCed.
|
||||
// This cell data is backed by the same byte[] where we read request in RPC(See
|
||||
// HBASE-15180). By default MSLAB is ON and we might have copied cell to MSLAB area. If
|
||||
// not we must do below deep copy. Or else we will keep referring to the bigger chunk of
|
||||
// memory and prevent it from getting GCed.
|
||||
// Copy to MSLAB would not have happened if
|
||||
// 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
|
||||
// 2. When the size of the cell is bigger than the max size supported by MSLAB. See
|
||||
|
@ -116,9 +156,42 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
if (!mslabUsed) {
|
||||
toAdd = deepCopyIfNeeded(toAdd);
|
||||
}
|
||||
internalAdd(toAdd, mslabUsed, memstoreSizing);
|
||||
internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
|
||||
}
|
||||
|
||||
private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint, MemStoreSizing
|
||||
memstoreSizing) {
|
||||
// Add the Cell to the MemStore
|
||||
// Use the internalAdd method here since we (a) already have a lock
|
||||
// and (b) cannot safely use the MSLAB here without potentially
|
||||
// hitting OOME - see TestMemStore.testUpsertMSLAB for a
|
||||
// test that triggers the pathological case if we don't avoid MSLAB
|
||||
// here.
|
||||
// This cell data is backed by the same byte[] where we read request in RPC(See
|
||||
// HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger
|
||||
// chunk of memory and prevent it from getting GCed.
|
||||
cell = deepCopyIfNeeded(cell);
|
||||
boolean sizeAddedPreOperation = sizeAddedPreOperation();
|
||||
currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation);
|
||||
setOldestEditTimeToNow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Issue any synchronization and test needed before applying the update
|
||||
* @param currentActive the segment to be updated
|
||||
* @param cell the cell to be added
|
||||
* @param memstoreSizing object to accumulate region size changes
|
||||
* @return true iff can proceed with applying the update
|
||||
*/
|
||||
protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell,
|
||||
MemStoreSizing memstoreSizing);
|
||||
|
||||
/**
|
||||
* Issue any post update synchronization and tests
|
||||
* @param currentActive updated segment
|
||||
*/
|
||||
protected abstract void postUpdate(MutableSegment currentActive);
|
||||
|
||||
private static Cell deepCopyIfNeeded(Cell cell) {
|
||||
if (cell instanceof ExtendedCell) {
|
||||
return ((ExtendedCell) cell).deepClone();
|
||||
|
@ -188,42 +261,11 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
}
|
||||
|
||||
protected void dump(Logger log) {
|
||||
active.dump(log);
|
||||
getActive().dump(log);
|
||||
snapshot.dump(log);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Inserts the specified Cell into MemStore and deletes any existing
|
||||
* versions of the same row/family/qualifier as the specified Cell.
|
||||
* <p>
|
||||
* First, the specified Cell is inserted into the Memstore.
|
||||
* <p>
|
||||
* If there are any existing Cell in this MemStore with the same row,
|
||||
* family, and qualifier, they are removed.
|
||||
* <p>
|
||||
* Callers must hold the read lock.
|
||||
*
|
||||
* @param cell the cell to be updated
|
||||
* @param readpoint readpoint below which we can safely remove duplicate KVs
|
||||
* @param memstoreSize
|
||||
*/
|
||||
private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
|
||||
// Add the Cell to the MemStore
|
||||
// Use the internalAdd method here since we (a) already have a lock
|
||||
// and (b) cannot safely use the MSLAB here without potentially
|
||||
// hitting OOME - see TestMemStore.testUpsertMSLAB for a
|
||||
// test that triggers the pathological case if we don't avoid MSLAB
|
||||
// here.
|
||||
// This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). We
|
||||
// must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
|
||||
// prevent it from getting GCed.
|
||||
cell = deepCopyIfNeeded(cell);
|
||||
this.active.upsert(cell, readpoint, memstoreSizing);
|
||||
setOldestEditTimeToNow();
|
||||
checkActiveSize();
|
||||
}
|
||||
|
||||
/*
|
||||
* @param a
|
||||
* @param b
|
||||
|
@ -275,8 +317,9 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
* @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
|
||||
* @return either the given cell or its clone
|
||||
*/
|
||||
private Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) {
|
||||
return active.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
|
||||
private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell, boolean
|
||||
forceCloneOfBigCell) {
|
||||
return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -286,14 +329,17 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
* Callers should ensure they already have the read lock taken
|
||||
* @param toAdd the cell to add
|
||||
* @param mslabUsed whether using MSLAB
|
||||
* @param memstoreSize
|
||||
* @param memstoreSizing object to accumulate changed size
|
||||
*/
|
||||
private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) {
|
||||
active.add(toAdd, mslabUsed, memstoreSizing);
|
||||
private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean
|
||||
mslabUsed, MemStoreSizing memstoreSizing) {
|
||||
boolean sizeAddedPreOperation = sizeAddedPreOperation();
|
||||
currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);
|
||||
setOldestEditTimeToNow();
|
||||
checkActiveSize();
|
||||
}
|
||||
|
||||
protected abstract boolean sizeAddedPreOperation();
|
||||
|
||||
private void setOldestEditTimeToNow() {
|
||||
if (timeOfOldestEdit == Long.MAX_VALUE) {
|
||||
timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
|
||||
|
@ -325,11 +371,6 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
return snapshot;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether anything need to be done based on the current active set size
|
||||
*/
|
||||
protected abstract void checkActiveSize();
|
||||
|
||||
/**
|
||||
* @return an ordered list of segments from most recent to oldest in memstore
|
||||
*/
|
||||
|
|
|
@ -55,11 +55,12 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
|
|||
* of CSLMImmutableSegment
|
||||
* The given iterator returns the Cells that "survived" the compaction.
|
||||
*/
|
||||
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing,
|
||||
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing mss,
|
||||
MemStoreCompactionStrategy.Action action) {
|
||||
super(segment); // initiailize the upper class
|
||||
long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
|
||||
incMemStoreSize(0, indexOverhead, 0); // CAM is always on-heap
|
||||
mss.incMemStoreSize(0, indexOverhead, 0);
|
||||
int numOfCells = segment.getCellsCount();
|
||||
// build the new CellSet based on CellChunkMap and update the CellSet of this Segment
|
||||
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
|
||||
|
@ -68,7 +69,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
|
|||
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
|
||||
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
|
||||
incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
mss.incMemStoreSize(0, newSegmentSizeDelta, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -80,13 +80,16 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
// initiate the heapSize with the size of the segment metadata
|
||||
if(onHeap) {
|
||||
incMemStoreSize(0, indexOverhead, 0);
|
||||
memstoreSizing.incMemStoreSize(0, indexOverhead, 0);
|
||||
} else {
|
||||
incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM);
|
||||
memstoreSizing.incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM,
|
||||
DEEP_OVERHEAD_CCM);
|
||||
}
|
||||
int numOfCells = segment.getCellsCount();
|
||||
// build the new CellSet based on CellChunkMap
|
||||
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
|
||||
action);
|
||||
memstoreSizing, action);
|
||||
// arrange the meta-data size, decrease all meta-data sizes related to SkipList;
|
||||
// add sizes of CellChunkMap entry, decrease also Cell object sizes
|
||||
// (reinitializeCellSet doesn't take the care for the sizes)
|
||||
|
@ -150,7 +153,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
// CellChunkMap assumes all cells are allocated on MSLAB.
|
||||
// Therefore, cells which are not allocated on MSLAB initially,
|
||||
// are copied into MSLAB here.
|
||||
c = copyCellIntoMSLAB(c);
|
||||
c = copyCellIntoMSLAB(c, null); //no memstore sizing object to update
|
||||
alreadyCopied = true;
|
||||
}
|
||||
if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunks[currentChunkIdx].size) {
|
||||
|
@ -197,7 +200,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
// This is a service for not-flat immutable segments
|
||||
private void reinitializeCellSet(
|
||||
int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
|
||||
MemStoreCompactionStrategy.Action action) {
|
||||
MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) {
|
||||
Cell curCell;
|
||||
Chunk[] chunks = allocIndexChunks(numOfCells);
|
||||
|
||||
|
@ -213,7 +216,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
// CellChunkMap assumes all cells are allocated on MSLAB.
|
||||
// Therefore, cells which are not allocated on MSLAB initially,
|
||||
// are copied into MSLAB here.
|
||||
curCell = copyCellIntoMSLAB(curCell);
|
||||
curCell = copyCellIntoMSLAB(curCell, memstoreSizing);
|
||||
}
|
||||
if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunks[currentChunkIdx].size) {
|
||||
// continue to the next metadata chunk
|
||||
|
@ -315,7 +318,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
return chunks;
|
||||
}
|
||||
|
||||
private Cell copyCellIntoMSLAB(Cell cell) {
|
||||
private Cell copyCellIntoMSLAB(Cell cell, MemStoreSizing memstoreSizing) {
|
||||
// Take care for a special case when a cell is copied from on-heap to (probably off-heap) MSLAB.
|
||||
// The cell allocated as an on-heap JVM object (byte array) occupies slightly different
|
||||
// amount of memory, than when the cell serialized and allocated on the MSLAB.
|
||||
|
@ -332,8 +335,10 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
long newCellSize = getCellLength(cell);
|
||||
long heapOverhead = newHeapSize - oldHeapSize;
|
||||
long offHeapOverhead = newOffHeapSize - oldOffHeapSize;
|
||||
//TODO: maybe need to update the dataSize of the region
|
||||
incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
|
||||
if(memstoreSizing != null) {
|
||||
memstoreSizing.incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
|
||||
}
|
||||
return cell;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size
|
||||
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
|
||||
"hbase.memstore.inmemoryflush.threshold.factor";
|
||||
private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014;
|
||||
private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
|
||||
private HStore store;
|
||||
|
@ -71,7 +71,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
protected MemStoreCompactor compactor;
|
||||
|
||||
private long inmemoryFlushSize; // the threshold on active size for in-memory flush
|
||||
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
|
||||
private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false);
|
||||
|
||||
// inWalReplay is true while we are synchronously replaying the edits from WAL
|
||||
private boolean inWalReplay = false;
|
||||
|
@ -94,11 +94,11 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
|
||||
+ 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
|
||||
// MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
|
||||
// indexType
|
||||
// MemStoreCompactor, inMemoryCompactionInProgress,
|
||||
// allowCompaction, indexType
|
||||
+ Bytes.SIZEOF_LONG // inmemoryFlushSize
|
||||
+ 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay
|
||||
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
|
||||
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction
|
||||
+ CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
|
||||
|
||||
public CompactingMemStore(Configuration conf, CellComparator c,
|
||||
|
@ -139,12 +139,15 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
// Family number might also be zero in some of our unit test case
|
||||
numStores = 1;
|
||||
}
|
||||
inmemoryFlushSize = memstoreFlushSize / numStores;
|
||||
// multiply by a factor (the same factor for all index types)
|
||||
factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
|
||||
IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT);
|
||||
|
||||
inmemoryFlushSize = (long) (inmemoryFlushSize * factor);
|
||||
factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0);
|
||||
if(factor != 0.0) {
|
||||
// multiply by a factor (the same factor for all index types)
|
||||
inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores;
|
||||
} else {
|
||||
inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER *
|
||||
conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
|
||||
inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -156,7 +159,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
@Override
|
||||
public MemStoreSize size() {
|
||||
MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
memstoreSizing.incMemStoreSize(active.getMemStoreSize());
|
||||
memstoreSizing.incMemStoreSize(getActive().getMemStoreSize());
|
||||
for (Segment item : pipeline.getSegments()) {
|
||||
memstoreSizing.incMemStoreSize(item.getMemStoreSize());
|
||||
}
|
||||
|
@ -201,9 +204,11 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
"Doing nothing. Another ongoing flush or did we fail last attempt?");
|
||||
} else {
|
||||
LOG.debug("FLUSHING TO DISK {}, store={}",
|
||||
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
|
||||
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
|
||||
stopCompaction();
|
||||
pushActiveToPipeline(this.active);
|
||||
// region level lock ensures pushing active to pipeline is done in isolation
|
||||
// no concurrent update operations trying to flush the active segment
|
||||
pushActiveToPipeline(getActive());
|
||||
snapshotId = EnvironmentEdgeManager.currentTime();
|
||||
// in both cases whatever is pushed to snapshot is cleared from the pipeline
|
||||
if (compositeSnapshot) {
|
||||
|
@ -223,19 +228,22 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
|
||||
if (compositeSnapshot) {
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
|
||||
memStoreSizing.incMemStoreSize(this.active.getMemStoreSize());
|
||||
MutableSegment currActive = getActive();
|
||||
if(!currActive.isEmpty()) {
|
||||
memStoreSizing.incMemStoreSize(currActive.getMemStoreSize());
|
||||
}
|
||||
mss = memStoreSizing.getMemStoreSize();
|
||||
} else {
|
||||
mss = pipeline.getTailSize();
|
||||
}
|
||||
}
|
||||
return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
|
||||
return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long keySize() {
|
||||
// Need to consider dataSize/keySize of all segments in pipeline and active
|
||||
long keySize = this.active.getDataSize();
|
||||
long keySize = getActive().getDataSize();
|
||||
for (Segment segment : this.pipeline.getSegments()) {
|
||||
keySize += segment.getDataSize();
|
||||
}
|
||||
|
@ -245,7 +253,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
@Override
|
||||
protected long heapSize() {
|
||||
// Need to consider heapOverhead of all segments in pipeline and active
|
||||
long h = this.active.getHeapSize();
|
||||
long h = getActive().getHeapSize();
|
||||
for (Segment segment : this.pipeline.getSegments()) {
|
||||
h += segment.getHeapSize();
|
||||
}
|
||||
|
@ -283,15 +291,43 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
inWalReplay = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Issue any synchronization and test needed before applying the update
|
||||
* For compacting memstore this means checking the update can increase the size without
|
||||
* overflow
|
||||
* @param currentActive the segment to be updated
|
||||
* @param cell the cell to be added
|
||||
* @param memstoreSizing object to accumulate region size changes
|
||||
* @return true iff can proceed with applying the update
|
||||
*/
|
||||
@Override protected boolean preUpdate(MutableSegment currentActive, Cell cell,
|
||||
MemStoreSizing memstoreSizing) {
|
||||
if(currentActive.sharedLock()) {
|
||||
if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) {
|
||||
return true;
|
||||
}
|
||||
currentActive.sharedUnlock();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override protected void postUpdate(MutableSegment currentActive) {
|
||||
currentActive.sharedUnlock();
|
||||
}
|
||||
|
||||
@Override protected boolean sizeAddedPreOperation() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// the getSegments() method is used for tests only
|
||||
@VisibleForTesting
|
||||
@Override
|
||||
protected List<Segment> getSegments() {
|
||||
List<? extends Segment> pipelineList = pipeline.getSegments();
|
||||
List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
|
||||
list.add(this.active);
|
||||
list.add(getActive());
|
||||
list.addAll(pipelineList);
|
||||
list.addAll(this.snapshot.getAllSegments());
|
||||
list.addAll(snapshot.getAllSegments());
|
||||
|
||||
return list;
|
||||
}
|
||||
|
@ -351,7 +387,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
|
||||
@Override
|
||||
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
|
||||
MutableSegment activeTmp = active;
|
||||
MutableSegment activeTmp = getActive();
|
||||
List<? extends Segment> pipelineList = pipeline.getSegments();
|
||||
List<? extends Segment> snapshotList = snapshot.getAllSegments();
|
||||
long numberOfSegments = 1L + pipelineList.size() + snapshotList.size();
|
||||
|
@ -363,56 +399,67 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
return list;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected List<KeyValueScanner> createList(int capacity) {
|
||||
return new ArrayList<>(capacity);
|
||||
}
|
||||
@VisibleForTesting
|
||||
protected List<KeyValueScanner> createList(int capacity) {
|
||||
return new ArrayList<>(capacity);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether anything need to be done based on the current active set size.
|
||||
* The method is invoked upon every addition to the active set.
|
||||
* For CompactingMemStore, flush the active set to the read-only memory if it's
|
||||
* size is above threshold
|
||||
* @param currActive intended segment to update
|
||||
* @param cellToAdd cell to be added to the segment
|
||||
* @param memstoreSizing object to accumulate changed size
|
||||
* @return true if the cell can be added to the
|
||||
*/
|
||||
@Override
|
||||
protected void checkActiveSize() {
|
||||
if (shouldFlushInMemory()) {
|
||||
/* The thread is dispatched to flush-in-memory. This cannot be done
|
||||
* on the same thread, because for flush-in-memory we require updatesLock
|
||||
* in exclusive mode while this method (checkActiveSize) is invoked holding updatesLock
|
||||
* in the shared mode. */
|
||||
InMemoryFlushRunnable runnable = new InMemoryFlushRunnable();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(
|
||||
"Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
|
||||
private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
|
||||
MemStoreSizing memstoreSizing) {
|
||||
if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
|
||||
if (currActive.setInMemoryFlushed()) {
|
||||
flushInMemory(currActive);
|
||||
if (inMemoryCompactionInProgress.compareAndSet(false, true)) {
|
||||
// The thread is dispatched to do in-memory compaction in the background
|
||||
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Dispatching the MemStore in-memory flush for store " + store
|
||||
.getColumnFamilyName());
|
||||
}
|
||||
getPool().execute(runnable);
|
||||
}
|
||||
}
|
||||
getPool().execute(runnable);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// internally used method, externally visible only for tests
|
||||
// externally visible only for tests
|
||||
// when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
|
||||
// otherwise there is a deadlock
|
||||
@VisibleForTesting
|
||||
void flushInMemory() throws IOException {
|
||||
// setting the inMemoryFlushInProgress flag again for the case this method is invoked
|
||||
// directly (only in tests) in the common path setting from true to true is idempotent
|
||||
inMemoryFlushInProgress.set(true);
|
||||
try {
|
||||
// Phase I: Update the pipeline
|
||||
getRegionServices().blockUpdates();
|
||||
try {
|
||||
LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
|
||||
pushActiveToPipeline(this.active);
|
||||
} finally {
|
||||
getRegionServices().unblockUpdates();
|
||||
}
|
||||
void flushInMemory() {
|
||||
MutableSegment currActive = getActive();
|
||||
if(currActive.setInMemoryFlushed()) {
|
||||
flushInMemory(currActive);
|
||||
}
|
||||
inMemoryCompaction();
|
||||
}
|
||||
|
||||
private void flushInMemory(MutableSegment currActive) {
|
||||
LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
|
||||
pushActiveToPipeline(currActive);
|
||||
}
|
||||
|
||||
void inMemoryCompaction() {
|
||||
// setting the inMemoryCompactionInProgress flag again for the case this method is invoked
|
||||
// directly (only in tests) in the common path setting from true to true is idempotent
|
||||
inMemoryCompactionInProgress.set(true);
|
||||
try {
|
||||
// Used by tests
|
||||
if (!allowCompaction.get()) {
|
||||
return;
|
||||
}
|
||||
// Phase II: Compact the pipeline
|
||||
try {
|
||||
// Speculative compaction execution, may be interrupted if flush is forced while
|
||||
// compaction is in progress
|
||||
|
@ -422,8 +469,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
|
||||
}
|
||||
} finally {
|
||||
inMemoryFlushInProgress.set(false);
|
||||
LOG.trace("IN-MEMORY FLUSH: end");
|
||||
inMemoryCompactionInProgress.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -442,16 +488,24 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected boolean shouldFlushInMemory() {
|
||||
if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold
|
||||
if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush
|
||||
return false; // regardless the size
|
||||
protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
|
||||
MemStoreSizing memstoreSizing) {
|
||||
long cellSize = currActive.getCellLength(cellToAdd);
|
||||
long segmentDataSize = currActive.getDataSize();
|
||||
while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
|
||||
// when replaying edits from WAL there is no need in in-memory flush regardless the size
|
||||
// otherwise size below flush threshold try to update atomically
|
||||
if(currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
|
||||
if(memstoreSizing != null){
|
||||
memstoreSizing.incMemStoreSize(cellSize, 0, 0);
|
||||
}
|
||||
//enough space for cell - no need to flush
|
||||
return false;
|
||||
}
|
||||
// the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
|
||||
// the insert of the active into the compaction pipeline
|
||||
return (inMemoryFlushInProgress.compareAndSet(false,true));
|
||||
segmentDataSize = currActive.getDataSize();
|
||||
}
|
||||
return false;
|
||||
// size above flush threshold
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -460,14 +514,14 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
* Non-blocking request
|
||||
*/
|
||||
private void stopCompaction() {
|
||||
if (inMemoryFlushInProgress.get()) {
|
||||
if (inMemoryCompactionInProgress.get()) {
|
||||
compactor.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected void pushActiveToPipeline(MutableSegment active) {
|
||||
if (!active.isEmpty()) {
|
||||
pipeline.pushHead(active);
|
||||
protected void pushActiveToPipeline(MutableSegment currActive) {
|
||||
if (!currActive.isEmpty()) {
|
||||
pipeline.pushHead(currActive);
|
||||
resetActive();
|
||||
}
|
||||
}
|
||||
|
@ -518,28 +572,21 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
}
|
||||
|
||||
/**
|
||||
* The in-memory-flusher thread performs the flush asynchronously.
|
||||
* There is at most one thread per memstore instance.
|
||||
* It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
|
||||
* and compacts the pipeline.
|
||||
*/
|
||||
private class InMemoryFlushRunnable implements Runnable {
|
||||
|
||||
* The in-memory-flusher thread performs the flush asynchronously.
|
||||
* There is at most one thread per memstore instance.
|
||||
* It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
|
||||
* and compacts the pipeline.
|
||||
*/
|
||||
private class InMemoryCompactionRunnable implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
flushInMemory();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to run memstore compaction. region "
|
||||
+ getRegionServices().getRegionInfo().getRegionNameAsString()
|
||||
+ "store: "+ getFamilyName(), e);
|
||||
}
|
||||
inMemoryCompaction();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isMemStoreFlushingInMemory() {
|
||||
return inMemoryFlushInProgress.get();
|
||||
return inMemoryCompactionInProgress.get();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -567,10 +614,10 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
|
||||
// debug method
|
||||
public void debug() {
|
||||
String msg = "active size=" + this.active.getDataSize();
|
||||
msg += " in-memory flush size is "+ inmemoryFlushSize;
|
||||
String msg = "active size=" + getActive().getDataSize();
|
||||
msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
|
||||
msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");
|
||||
msg += " inMemoryCompactionInProgress is "+ (inMemoryCompactionInProgress.get() ? "true" :
|
||||
"false");
|
||||
LOG.debug(msg);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -151,15 +150,10 @@ public class CompactionPipeline {
|
|||
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
|
||||
long heapSizeDelta = suffixHeapSize - newHeapSize;
|
||||
region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta);
|
||||
LOG.debug("Suffix data size={}, new segment data size={}, "
|
||||
+ "suffix heap size={}," + "new segment heap size={}"
|
||||
+ "suffix off heap size={}," + "new segment off heap size={}"
|
||||
, suffixDataSize
|
||||
, newDataSize
|
||||
, suffixHeapSize
|
||||
, newHeapSize
|
||||
, suffixOffHeapSize
|
||||
, newOffHeapSize);
|
||||
LOG.debug("Suffix data size={}, new segment data size={}, " + "suffix heap size={},"
|
||||
+ "new segment heap size={}" + "suffix off heap size={},"
|
||||
+ "new segment off heap size={}", suffixDataSize, newDataSize, suffixHeapSize,
|
||||
newHeapSize, suffixOffHeapSize, newOffHeapSize);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -214,6 +208,7 @@ public class CompactionPipeline {
|
|||
int i = 0;
|
||||
for (ImmutableSegment s : pipeline) {
|
||||
if ( s.canBeFlattened() ) {
|
||||
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
|
||||
// size to be updated
|
||||
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
|
||||
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
|
||||
|
@ -223,7 +218,6 @@ public class CompactionPipeline {
|
|||
// Update the global memstore size counter upon flattening there is no change in the
|
||||
// data size
|
||||
MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
|
||||
Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!");
|
||||
region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize());
|
||||
}
|
||||
LOG.debug("Compaction pipeline segment {} flattened", s);
|
||||
|
|
|
@ -239,13 +239,14 @@ public class CompositeImmutableSegment extends ImmutableSegment {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) {
|
||||
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing,
|
||||
boolean sizeAddedPreOperation) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
|
||||
MemStoreSizing memstoreSizing) {
|
||||
MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) {
|
||||
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
|
||||
}
|
||||
|
||||
|
|
|
@ -87,9 +87,9 @@ public class DefaultMemStore extends AbstractMemStore {
|
|||
"Doing nothing. Another ongoing flush or did we fail last attempt?");
|
||||
} else {
|
||||
this.snapshotId = EnvironmentEdgeManager.currentTime();
|
||||
if (!this.active.isEmpty()) {
|
||||
if (!getActive().isEmpty()) {
|
||||
ImmutableSegment immutableSegment = SegmentFactory.instance().
|
||||
createImmutableSegment(this.active);
|
||||
createImmutableSegment(getActive());
|
||||
this.snapshot = immutableSegment;
|
||||
resetActive();
|
||||
}
|
||||
|
@ -100,17 +100,17 @@ public class DefaultMemStore extends AbstractMemStore {
|
|||
@Override
|
||||
public MemStoreSize getFlushableSize() {
|
||||
MemStoreSize mss = getSnapshotSize();
|
||||
return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
|
||||
return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long keySize() {
|
||||
return this.active.getDataSize();
|
||||
return getActive().getDataSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long heapSize() {
|
||||
return this.active.getHeapSize();
|
||||
return getActive().getHeapSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,7 +119,7 @@ public class DefaultMemStore extends AbstractMemStore {
|
|||
*/
|
||||
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
|
||||
List<KeyValueScanner> list = new ArrayList<>();
|
||||
addToScanners(active, readPt, list);
|
||||
addToScanners(getActive(), readPt, list);
|
||||
addToScanners(snapshot.getAllSegments(), readPt, list);
|
||||
return list;
|
||||
}
|
||||
|
@ -127,8 +127,8 @@ public class DefaultMemStore extends AbstractMemStore {
|
|||
@Override
|
||||
protected List<Segment> getSegments() throws IOException {
|
||||
List<Segment> list = new ArrayList<>(2);
|
||||
list.add(this.active);
|
||||
list.add(this.snapshot);
|
||||
list.add(getActive());
|
||||
list.add(snapshot);
|
||||
return list;
|
||||
}
|
||||
|
||||
|
@ -139,27 +139,31 @@ public class DefaultMemStore extends AbstractMemStore {
|
|||
*/
|
||||
Cell getNextRow(final Cell cell) {
|
||||
return getLowest(
|
||||
getNextRow(cell, this.active.getCellSet()),
|
||||
getNextRow(cell, this.getActive().getCellSet()),
|
||||
getNextRow(cell, this.snapshot.getCellSet()));
|
||||
}
|
||||
|
||||
@Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MemStoreSize size() {
|
||||
return active.getMemStoreSize();
|
||||
@Override protected boolean preUpdate(MutableSegment currentActive, Cell cell,
|
||||
MemStoreSizing memstoreSizing) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether anything need to be done based on the current active set size
|
||||
* Nothing need to be done for the DefaultMemStore
|
||||
*/
|
||||
@Override
|
||||
protected void checkActiveSize() {
|
||||
@Override protected void postUpdate(MutableSegment currentActive) {
|
||||
return;
|
||||
}
|
||||
|
||||
@Override protected boolean sizeAddedPreOperation() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MemStoreSize size() {
|
||||
return getActive().getMemStoreSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long preFlushSeqIDEstimation() {
|
||||
return HConstants.NO_SEQNUM;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -207,10 +208,14 @@ public class MemStoreCompactor {
|
|||
|
||||
ImmutableSegment result = null;
|
||||
MemStoreSegmentsIterator iterator = null;
|
||||
List<ImmutableSegment> segments = versionedList.getStoreSegments();
|
||||
for (ImmutableSegment s : segments) {
|
||||
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
|
||||
}
|
||||
|
||||
switch (action) {
|
||||
case COMPACT:
|
||||
iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
|
||||
iterator = new MemStoreCompactorSegmentsIterator(segments,
|
||||
compactingMemStore.getComparator(),
|
||||
compactionKVMax, compactingMemStore.getStore());
|
||||
|
||||
|
@ -222,13 +227,12 @@ public class MemStoreCompactor {
|
|||
case MERGE:
|
||||
case MERGE_COUNT_UNIQUE_KEYS:
|
||||
iterator =
|
||||
new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
|
||||
new MemStoreMergerSegmentsIterator(segments,
|
||||
compactingMemStore.getComparator(), compactionKVMax);
|
||||
|
||||
result = SegmentFactory.instance().createImmutableSegmentByMerge(
|
||||
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
|
||||
versionedList.getNumOfCells(), versionedList.getStoreSegments(),
|
||||
compactingMemStore.getIndexType(), action);
|
||||
versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action);
|
||||
iterator.close();
|
||||
break;
|
||||
default:
|
||||
|
|
|
@ -120,8 +120,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
*/
|
||||
@Override
|
||||
public Cell forceCopyOfBigCellInto(Cell cell) {
|
||||
int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
|
||||
KeyValueUtil.length(cell);
|
||||
int size = Segment.getCellLength(cell);
|
||||
size += ChunkCreator.SIZEOF_CHUNK_HEADER;
|
||||
Preconditions.checkArgument(size >= 0, "negative size");
|
||||
if (size <= dataChunkSize) {
|
||||
|
@ -135,8 +134,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
}
|
||||
|
||||
private Cell copyCellInto(Cell cell, int maxAlloc) {
|
||||
int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
|
||||
KeyValueUtil.length(cell);
|
||||
int size = Segment.getCellLength(cell);
|
||||
Preconditions.checkArgument(size >= 0, "negative size");
|
||||
// Callers should satisfy large allocations directly from JVM since they
|
||||
// don't cause fragmentation as badly.
|
||||
|
|
|
@ -81,6 +81,10 @@ public interface MemStoreSizing {
|
|||
long offHeapSizeDelta) {
|
||||
throw new RuntimeException("I'm a DUD, you can't use me!");
|
||||
}
|
||||
|
||||
@Override public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
throw new RuntimeException("I'm a DUD, you can't use me!");
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -104,6 +108,8 @@ public interface MemStoreSizing {
|
|||
return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), -delta.getOffHeapSize());
|
||||
}
|
||||
|
||||
boolean compareAndSetDataSize(long expected, long updated);
|
||||
|
||||
long getDataSize();
|
||||
long getHeapSize();
|
||||
long getOffHeapSize();
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import java.util.Iterator;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
|
@ -38,9 +39,13 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
@InterfaceAudience.Private
|
||||
public class MutableSegment extends Segment {
|
||||
|
||||
public final static long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
|
||||
+ ClassSize.CONCURRENT_SKIPLISTMAP
|
||||
+ ClassSize.SYNC_TIMERANGE_TRACKER;
|
||||
private final AtomicBoolean flushed = new AtomicBoolean(false);
|
||||
|
||||
public final static long DEEP_OVERHEAD = ClassSize.align(Segment.DEEP_OVERHEAD
|
||||
+ ClassSize.CONCURRENT_SKIPLISTMAP
|
||||
+ ClassSize.SYNC_TIMERANGE_TRACKER
|
||||
+ ClassSize.REFERENCE
|
||||
+ ClassSize.ATOMIC_BOOLEAN);
|
||||
|
||||
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
|
||||
super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
|
||||
|
@ -52,12 +57,14 @@ public class MutableSegment extends Segment {
|
|||
* @param cell the cell to add
|
||||
* @param mslabUsed whether using MSLAB
|
||||
*/
|
||||
public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing) {
|
||||
internalAdd(cell, mslabUsed, memStoreSizing);
|
||||
public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing,
|
||||
boolean sizeAddedPreOperation) {
|
||||
internalAdd(cell, mslabUsed, memStoreSizing, sizeAddedPreOperation);
|
||||
}
|
||||
|
||||
public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing) {
|
||||
internalAdd(cell, false, memStoreSizing);
|
||||
public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing,
|
||||
boolean sizeAddedPreOperation) {
|
||||
internalAdd(cell, false, memStoreSizing, sizeAddedPreOperation);
|
||||
|
||||
// Get the Cells for the row/family/qualifier regardless of timestamp.
|
||||
// For this case we want to clean up any other puts
|
||||
|
@ -105,6 +112,10 @@ public class MutableSegment extends Segment {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean setInMemoryFlushed() {
|
||||
return flushed.compareAndSet(false, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the first cell in the segment
|
||||
* @return the first cell in the segment
|
||||
|
|
|
@ -59,6 +59,14 @@ class NonThreadSafeMemStoreSizing implements MemStoreSizing {
|
|||
return this.dataSize;
|
||||
}
|
||||
|
||||
@Override public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
if(dataSize == expected) {
|
||||
dataSize = updated;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataSize() {
|
||||
return dataSize;
|
||||
|
|
|
@ -57,14 +57,6 @@ public class RegionServicesForStores {
|
|||
this.region = region;
|
||||
}
|
||||
|
||||
public void blockUpdates() {
|
||||
region.blockUpdates();
|
||||
}
|
||||
|
||||
public void unblockUpdates() {
|
||||
region.unblockUpdates();
|
||||
}
|
||||
|
||||
public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
|
||||
region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
|
||||
}
|
||||
|
|
|
@ -24,9 +24,11 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.ExtendedCell;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
|
@ -48,15 +50,17 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
public abstract class Segment implements MemStoreSizing {
|
||||
|
||||
public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
|
||||
+ 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, memStoreSizing,
|
||||
+ 6 * ClassSize.REFERENCE // cellSet, comparator, updatesLock, memStoreLAB, memStoreSizing,
|
||||
// and timeRangeTracker
|
||||
+ Bytes.SIZEOF_LONG // minSequenceId
|
||||
+ Bytes.SIZEOF_BOOLEAN); // tagsPresent
|
||||
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE
|
||||
+ ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG;
|
||||
+ ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG
|
||||
+ ClassSize.REENTRANT_LOCK;
|
||||
|
||||
private AtomicReference<CellSet> cellSet= new AtomicReference<>();
|
||||
private final CellComparator comparator;
|
||||
private ReentrantReadWriteLock updatesLock;
|
||||
protected long minSequenceId;
|
||||
private MemStoreLAB memStoreLAB;
|
||||
// Sum of sizes of all Cells added to this Segment. Cell's HeapSize is considered. This is not
|
||||
|
@ -87,6 +91,7 @@ public abstract class Segment implements MemStoreSizing {
|
|||
OffHeapSize += memStoreSize.getOffHeapSize();
|
||||
}
|
||||
this.comparator = comparator;
|
||||
this.updatesLock = new ReentrantReadWriteLock();
|
||||
// Do we need to be thread safe always? What if ImmutableSegment?
|
||||
// DITTO for the TimeRangeTracker below.
|
||||
this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, OffHeapSize);
|
||||
|
@ -97,6 +102,7 @@ public abstract class Segment implements MemStoreSizing {
|
|||
protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, TimeRangeTracker trt) {
|
||||
this.cellSet.set(cellSet);
|
||||
this.comparator = comparator;
|
||||
this.updatesLock = new ReentrantReadWriteLock();
|
||||
this.minSequenceId = Long.MAX_VALUE;
|
||||
this.memStoreLAB = memStoreLAB;
|
||||
// Do we need to be thread safe always? What if ImmutableSegment?
|
||||
|
@ -109,9 +115,10 @@ public abstract class Segment implements MemStoreSizing {
|
|||
protected Segment(Segment segment) {
|
||||
this.cellSet.set(segment.getCellSet());
|
||||
this.comparator = segment.getComparator();
|
||||
this.updatesLock = segment.getUpdatesLock();
|
||||
this.minSequenceId = segment.getMinSequenceId();
|
||||
this.memStoreLAB = segment.getMemStoreLAB();
|
||||
this.memStoreSizing = new ThreadSafeMemStoreSizing(segment.memStoreSizing.getMemStoreSize());
|
||||
this.memStoreSizing = segment.memStoreSizing;
|
||||
this.tagsPresent = segment.isTagsPresent();
|
||||
this.timeRangeTracker = segment.getTimeRangeTracker();
|
||||
}
|
||||
|
@ -183,7 +190,8 @@ public abstract class Segment implements MemStoreSizing {
|
|||
*/
|
||||
@VisibleForTesting
|
||||
static int getCellLength(Cell cell) {
|
||||
return KeyValueUtil.length(cell);
|
||||
return cell instanceof ExtendedCell ? ((ExtendedCell)cell).getSerializedSize():
|
||||
KeyValueUtil.length(cell);
|
||||
}
|
||||
|
||||
public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
|
||||
|
@ -244,6 +252,25 @@ public abstract class Segment implements MemStoreSizing {
|
|||
return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
|
||||
}
|
||||
|
||||
public boolean sharedLock() {
|
||||
return updatesLock.readLock().tryLock();
|
||||
}
|
||||
|
||||
public void sharedUnlock() {
|
||||
updatesLock.readLock().unlock();
|
||||
}
|
||||
|
||||
public void waitForUpdates() {
|
||||
if(!updatesLock.isWriteLocked()) {
|
||||
updatesLock.writeLock().lock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
return memStoreSizing.compareAndSetDataSize(expected, updated);
|
||||
}
|
||||
|
||||
public long getMinSequenceId() {
|
||||
return minSequenceId;
|
||||
}
|
||||
|
@ -288,25 +315,30 @@ public abstract class Segment implements MemStoreSizing {
|
|||
return comparator;
|
||||
}
|
||||
|
||||
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) {
|
||||
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing,
|
||||
boolean sizeAddedPreOperation) {
|
||||
boolean succ = getCellSet().add(cell);
|
||||
updateMetaInfo(cell, succ, mslabUsed, memstoreSizing);
|
||||
updateMetaInfo(cell, succ, mslabUsed, memstoreSizing, sizeAddedPreOperation);
|
||||
}
|
||||
|
||||
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
|
||||
MemStoreSizing memstoreSizing) {
|
||||
long cellSize = 0;
|
||||
MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) {
|
||||
long delta = 0;
|
||||
long cellSize = getCellLength(cellToAdd);
|
||||
// If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
|
||||
// MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
|
||||
// than the counted number)
|
||||
if (succ || mslabUsed) {
|
||||
cellSize = getCellLength(cellToAdd);
|
||||
delta = cellSize;
|
||||
}
|
||||
long heapSize = heapSizeChange(cellToAdd, succ);
|
||||
long offHeapSize = offHeapSizeChange(cellToAdd, succ);
|
||||
incMemStoreSize(cellSize, heapSize, offHeapSize);
|
||||
if(sizeAddedPreOperation) {
|
||||
delta -= cellSize;
|
||||
}
|
||||
long heapSize = heapSizeChange(cellToAdd, succ || mslabUsed);
|
||||
long offHeapSize = offHeapSizeChange(cellToAdd, succ || mslabUsed);
|
||||
incMemStoreSize(delta, heapSize, offHeapSize);
|
||||
if (memstoreSizing != null) {
|
||||
memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize);
|
||||
memstoreSizing.incMemStoreSize(delta, heapSize, offHeapSize);
|
||||
}
|
||||
getTimeRangeTracker().includeTimestamp(cellToAdd);
|
||||
minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
|
||||
|
@ -320,16 +352,16 @@ public abstract class Segment implements MemStoreSizing {
|
|||
}
|
||||
|
||||
protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSizing memstoreSizing) {
|
||||
updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing);
|
||||
updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The increase in heap size because of this cell addition. This includes this cell POJO's
|
||||
* heap size itself and additional overhead because of addition on to CSLM.
|
||||
*/
|
||||
protected long heapSizeChange(Cell cell, boolean succ) {
|
||||
protected long heapSizeChange(Cell cell, boolean allocated) {
|
||||
long res = 0;
|
||||
if (succ) {
|
||||
if (allocated) {
|
||||
boolean onHeap = true;
|
||||
MemStoreLAB memStoreLAB = getMemStoreLAB();
|
||||
if(memStoreLAB != null) {
|
||||
|
@ -344,9 +376,9 @@ public abstract class Segment implements MemStoreSizing {
|
|||
return res;
|
||||
}
|
||||
|
||||
protected long offHeapSizeChange(Cell cell, boolean succ) {
|
||||
protected long offHeapSizeChange(Cell cell, boolean allocated) {
|
||||
long res = 0;
|
||||
if (succ) {
|
||||
if (allocated) {
|
||||
boolean offHeap = false;
|
||||
MemStoreLAB memStoreLAB = getMemStoreLAB();
|
||||
if(memStoreLAB != null) {
|
||||
|
@ -410,4 +442,8 @@ public abstract class Segment implements MemStoreSizing {
|
|||
res += "max timestamp=" + timeRangeTracker.getMax();
|
||||
return res;
|
||||
}
|
||||
|
||||
private ReentrantReadWriteLock getUpdatesLock() {
|
||||
return updatesLock;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,6 +58,10 @@ class ThreadSafeMemStoreSizing implements MemStoreSizing {
|
|||
return this.dataSize.addAndGet(dataSizeDelta);
|
||||
}
|
||||
|
||||
@Override public boolean compareAndSetDataSize(long expected, long updated) {
|
||||
return dataSize.compareAndSet(expected,updated);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataSize() {
|
||||
return dataSize.get();
|
||||
|
|
|
@ -376,11 +376,13 @@ public class TestHeapSize {
|
|||
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
|
||||
expected += ClassSize.estimateBase(AtomicReference.class, false);
|
||||
expected += ClassSize.estimateBase(CellSet.class, false);
|
||||
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
|
||||
if (expected != actual) {
|
||||
ClassSize.estimateBase(cl, true);
|
||||
ClassSize.estimateBase(AtomicLong.class, true);
|
||||
ClassSize.estimateBase(AtomicReference.class, true);
|
||||
ClassSize.estimateBase(CellSet.class, true);
|
||||
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
|
@ -391,16 +393,20 @@ public class TestHeapSize {
|
|||
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
|
||||
expected += ClassSize.estimateBase(AtomicReference.class, false);
|
||||
expected += ClassSize.estimateBase(CellSet.class, false);
|
||||
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
|
||||
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
|
||||
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
|
||||
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
|
||||
if (expected != actual) {
|
||||
ClassSize.estimateBase(cl, true);
|
||||
ClassSize.estimateBase(AtomicLong.class, true);
|
||||
ClassSize.estimateBase(AtomicLong.class, true);
|
||||
ClassSize.estimateBase(AtomicReference.class, true);
|
||||
ClassSize.estimateBase(CellSet.class, true);
|
||||
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
|
||||
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
|
||||
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
|
||||
ClassSize.estimateBase(AtomicBoolean.class,true);
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
|
@ -411,6 +417,7 @@ public class TestHeapSize {
|
|||
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
|
||||
expected += ClassSize.estimateBase(AtomicReference.class, false);
|
||||
expected += ClassSize.estimateBase(CellSet.class, false);
|
||||
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
|
||||
expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
|
||||
if (expected != actual) {
|
||||
ClassSize.estimateBase(cl, true);
|
||||
|
@ -418,6 +425,7 @@ public class TestHeapSize {
|
|||
ClassSize.estimateBase(AtomicLong.class, true);
|
||||
ClassSize.estimateBase(AtomicReference.class, true);
|
||||
ClassSize.estimateBase(CellSet.class, true);
|
||||
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
|
||||
ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
@ -428,6 +436,7 @@ public class TestHeapSize {
|
|||
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
|
||||
expected += ClassSize.estimateBase(AtomicReference.class, false);
|
||||
expected += ClassSize.estimateBase(CellSet.class, false);
|
||||
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
|
||||
expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
|
||||
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
|
||||
if (expected != actual) {
|
||||
|
@ -436,6 +445,7 @@ public class TestHeapSize {
|
|||
ClassSize.estimateBase(AtomicLong.class, true);
|
||||
ClassSize.estimateBase(AtomicReference.class, true);
|
||||
ClassSize.estimateBase(CellSet.class, true);
|
||||
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
|
||||
ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
|
||||
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
|
||||
assertEquals(expected, actual);
|
||||
|
@ -446,6 +456,7 @@ public class TestHeapSize {
|
|||
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
|
||||
expected += ClassSize.estimateBase(AtomicReference.class, false);
|
||||
expected += ClassSize.estimateBase(CellSet.class, false);
|
||||
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
|
||||
expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
|
||||
expected += ClassSize.estimateBase(CellArrayMap.class, false);
|
||||
if (expected != actual) {
|
||||
|
@ -454,6 +465,7 @@ public class TestHeapSize {
|
|||
ClassSize.estimateBase(AtomicLong.class, true);
|
||||
ClassSize.estimateBase(AtomicReference.class, true);
|
||||
ClassSize.estimateBase(CellSet.class, true);
|
||||
ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
|
||||
ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
|
||||
ClassSize.estimateBase(CellArrayMap.class, true);
|
||||
assertEquals(expected, actual);
|
||||
|
|
|
@ -838,7 +838,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
byte[] row = Bytes.toBytes(keys[i]);
|
||||
byte[] val = Bytes.toBytes(keys[i] + i);
|
||||
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
|
||||
totalLen += kv.getLength();
|
||||
totalLen += Segment.getCellLength(kv);
|
||||
hmc.add(kv, null);
|
||||
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
|
||||
}
|
||||
|
@ -859,7 +859,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
|
|||
Threads.sleep(1); // to make sure each kv gets a different ts
|
||||
byte[] row = Bytes.toBytes(keys[i]);
|
||||
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
|
||||
totalLen += kv.getLength();
|
||||
totalLen += Segment.getCellLength(kv);
|
||||
hmc.add(kv, null);
|
||||
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
|
||||
}
|
||||
|
|
|
@ -757,7 +757,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
// set memstore to flat into CellChunkMap
|
||||
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
|
||||
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
|
||||
String.valueOf(compactionType));
|
||||
String.valueOf(compactionType));
|
||||
((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
|
||||
((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
|
||||
|
||||
|
@ -796,11 +796,13 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
|
||||
// totalCellsLen should remain the same
|
||||
long oneCellOnCCMHeapSize =
|
||||
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
|
||||
(long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
|
||||
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
|
||||
+ numOfCells * oneCellOnCCMHeapSize;
|
||||
|
||||
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
|
||||
assertEquals(totalCellsLen+ChunkCreator.SIZEOF_CHUNK_HEADER, regionServicesForStores
|
||||
.getMemStoreSize());
|
||||
|
||||
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
||||
|
||||
MemStoreSize mss = memstore.getFlushableSize();
|
||||
|
@ -824,8 +826,9 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
// but smaller than the size of two cells.
|
||||
// Therefore, the two created cells are flattened together.
|
||||
totalHeapSize = MutableSegment.DEEP_OVERHEAD
|
||||
+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
|
||||
+ 2 * oneCellOnCCMHeapSize;
|
||||
+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
|
||||
+ 1 * oneCellOnCSLMHeapSize
|
||||
+ 1 * oneCellOnCCMHeapSize;
|
||||
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
||||
}
|
||||
|
||||
|
@ -848,6 +851,8 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
|
||||
memstore.getConfiguration().setInt(MemStoreCompactionStrategy
|
||||
.COMPACTING_MEMSTORE_THRESHOLD_KEY, 4);
|
||||
memstore.getConfiguration()
|
||||
.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.014);
|
||||
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
|
||||
String.valueOf(compactionType));
|
||||
((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
|
||||
|
@ -860,39 +865,42 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
|
|||
String bigVal = new String(chars);
|
||||
byte[] val = Bytes.toBytes(bigVal);
|
||||
|
||||
// We need to add two cells, five times, in order to guarantee a merge
|
||||
// We need to add two cells, three times, in order to guarantee a merge
|
||||
List<String[]> keysList = new ArrayList<>();
|
||||
keysList.add(new String[]{"A", "B"});
|
||||
keysList.add(new String[]{"C", "D"});
|
||||
keysList.add(new String[]{"E", "F"});
|
||||
keysList.add(new String[]{"G", "H"});
|
||||
keysList.add(new String[]{"I", "J"});
|
||||
|
||||
// Measuring the size of a single kv
|
||||
KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"),
|
||||
Bytes.toBytes("testqualifier"), System.currentTimeMillis(), val);
|
||||
long oneCellOnCCMHeapSize =
|
||||
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
|
||||
|
||||
(long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
|
||||
long oneCellOnCSLMHeapSize =
|
||||
ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
|
||||
long totalHeapSize = MutableSegment.DEEP_OVERHEAD;
|
||||
for (int i = 0; i < 5; i++) {
|
||||
for (int i = 0; i < keysList.size(); i++) {
|
||||
addRowsByKeys(memstore, keysList.get(i), val);
|
||||
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
|
||||
Threads.sleep(10);
|
||||
}
|
||||
|
||||
// The in-memory flush size is bigger than the size of a single cell,
|
||||
// but smaller than the size of two cells.
|
||||
// Therefore, the two created cells are flattened together.
|
||||
totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
|
||||
+ 2 * oneCellOnCCMHeapSize;
|
||||
if (i == 4) {
|
||||
// Four out of the five are merged into one,
|
||||
// and the segment becomes immutable
|
||||
totalHeapSize -= (3 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
|
||||
+ MutableSegment.DEEP_OVERHEAD);
|
||||
if(i==0) {
|
||||
totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
|
||||
+ oneCellOnCCMHeapSize + oneCellOnCSLMHeapSize;
|
||||
} else {
|
||||
// The in-memory flush size is bigger than the size of a single cell,
|
||||
// but smaller than the size of two cells.
|
||||
// Therefore, the two created cells are flattened in a seperate segment.
|
||||
totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize);
|
||||
}
|
||||
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
||||
if (i == 2) {
|
||||
// Four out of the five segments are merged into one
|
||||
totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
|
||||
totalHeapSize = ClassSize.align(totalHeapSize);
|
||||
}
|
||||
assertEquals("i="+i, totalHeapSize, ((CompactingMemStore) memstore).heapSize());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1765,15 +1765,12 @@ public class TestHStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldFlushInMemory() {
|
||||
boolean rval = super.shouldFlushInMemory();
|
||||
if (rval) {
|
||||
RUNNER_COUNT.incrementAndGet();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("runner count: " + RUNNER_COUNT.get());
|
||||
}
|
||||
void inMemoryCompaction() {
|
||||
RUNNER_COUNT.incrementAndGet();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("runner count: " + RUNNER_COUNT.get());
|
||||
}
|
||||
return rval;
|
||||
super.inMemoryCompaction();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue