HBASE-18375: Fix the bug where the pool chunks from ChunkCreator are deallocated and not returned to pool, because there is no reference to them
This commit is contained in:
parent
d7a74a75a1
commit
68ec2a9da0
|
@ -50,10 +50,9 @@ public class ChunkCreator {
|
|||
// monotonically increasing chunkid
|
||||
private AtomicInteger chunkID = new AtomicInteger(1);
|
||||
// maps the chunk against the monotonically increasing chunk id. We need to preserve the
|
||||
// natural ordering of the key
|
||||
// CellChunkMap creation should convert the soft ref to hard reference
|
||||
private Map<Integer, SoftReference<Chunk>> chunkIdMap =
|
||||
new ConcurrentHashMap<Integer, SoftReference<Chunk>>();
|
||||
// natural ordering of the key. It also helps to protect from GC.
|
||||
private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
|
||||
|
||||
private final int chunkSize;
|
||||
private final boolean offheap;
|
||||
@VisibleForTesting
|
||||
|
@ -75,7 +74,7 @@ public class ChunkCreator {
|
|||
}
|
||||
|
||||
/**
|
||||
* Initializes the instance of MSLABChunkCreator
|
||||
* Initializes the instance of ChunkCreator
|
||||
* @param chunkSize the chunkSize
|
||||
* @param offheap indicates if the chunk is to be created offheap or not
|
||||
* @param globalMemStoreSize the global memstore size
|
||||
|
@ -100,10 +99,19 @@ public class ChunkCreator {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates and inits a chunk.
|
||||
* Creates and inits a chunk. The default implementation.
|
||||
* @return the chunk that was initialized
|
||||
*/
|
||||
Chunk getChunk() {
|
||||
return getChunk(CompactingMemStore.IndexType.ARRAY_MAP);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and inits a chunk.
|
||||
* @return the chunk that was initialized
|
||||
* @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
|
||||
*/
|
||||
Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
|
||||
Chunk chunk = null;
|
||||
if (pool != null) {
|
||||
// the pool creates the chunk internally. The chunk#init() call happens here
|
||||
|
@ -117,44 +125,49 @@ public class ChunkCreator {
|
|||
}
|
||||
}
|
||||
if (chunk == null) {
|
||||
chunk = createChunk();
|
||||
// the second boolean parameter means:
|
||||
// if CellChunkMap index is requested, put allocated on demand chunk mapping into chunkIdMap
|
||||
chunk = createChunk(false, chunkIndexType);
|
||||
}
|
||||
// put this chunk into the chunkIdMap
|
||||
this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk));
|
||||
|
||||
// now we need to actually do the expensive memory allocation step in case of a new chunk,
|
||||
// else only the offset is set to the beginning of the chunk to accept allocations
|
||||
chunk.init();
|
||||
return chunk;
|
||||
}
|
||||
|
||||
private Chunk createChunk() {
|
||||
return createChunk(false);
|
||||
private Chunk createChunkForPool() {
|
||||
return createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the chunk either onheap or offheap
|
||||
* @param pool indicates if the chunks have to be created which will be used by the Pool
|
||||
* @param chunkIndexType
|
||||
* @return the chunk
|
||||
*/
|
||||
private Chunk createChunk(boolean pool) {
|
||||
private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType) {
|
||||
Chunk chunk = null;
|
||||
int id = chunkID.getAndIncrement();
|
||||
assert id > 0;
|
||||
// do not create offheap chunk on demand
|
||||
if (pool && this.offheap) {
|
||||
return new OffheapChunk(chunkSize, id, pool);
|
||||
chunk = new OffheapChunk(chunkSize, id, pool);
|
||||
} else {
|
||||
return new OnheapChunk(chunkSize, id, pool);
|
||||
chunk = new OnheapChunk(chunkSize, id, pool);
|
||||
}
|
||||
if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) {
|
||||
// put the pool chunk into the chunkIdMap so it is not GC-ed
|
||||
this.chunkIdMap.put(chunk.getId(), chunk);
|
||||
}
|
||||
return chunk;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
// TODO : To be used by CellChunkMap
|
||||
// Used to translate the ChunkID into a chunk ref
|
||||
Chunk getChunk(int id) {
|
||||
SoftReference<Chunk> ref = chunkIdMap.get(id);
|
||||
if (ref != null) {
|
||||
return ref.get();
|
||||
}
|
||||
return null;
|
||||
// can return null if chunk was never mapped
|
||||
return chunkIdMap.get(id);
|
||||
}
|
||||
|
||||
int getChunkSize() {
|
||||
|
@ -170,15 +183,13 @@ public class ChunkCreator {
|
|||
}
|
||||
|
||||
Chunk removeChunk(int chunkId) {
|
||||
SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId);
|
||||
if (ref != null) {
|
||||
return ref.get();
|
||||
}
|
||||
return null;
|
||||
return this.chunkIdMap.remove(chunkId);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int size() {
|
||||
// the chunks in the chunkIdMap may already be released so we shouldn't relay
|
||||
// on this counting for strong correctness. This method is used only in testing.
|
||||
int numberOfMappedChunks() {
|
||||
return this.chunkIdMap.size();
|
||||
}
|
||||
|
||||
|
@ -213,7 +224,8 @@ public class ChunkCreator {
|
|||
this.poolSizePercentage = poolSizePercentage;
|
||||
this.reclaimedChunks = new LinkedBlockingQueue<>();
|
||||
for (int i = 0; i < initialCount; i++) {
|
||||
Chunk chunk = createChunk(true);
|
||||
// Chunks from pool are covered with strong references anyway
|
||||
Chunk chunk = createChunkForPool();
|
||||
chunk.init();
|
||||
reclaimedChunks.add(chunk);
|
||||
}
|
||||
|
@ -232,7 +244,7 @@ public class ChunkCreator {
|
|||
* then.
|
||||
* Note: Chunks returned by this pool must be put back to the pool after its use.
|
||||
* @return a chunk
|
||||
* @see #putbackChunks(Set)
|
||||
* @see #putbackChunks(Chunk)
|
||||
*/
|
||||
Chunk getChunk() {
|
||||
Chunk chunk = reclaimedChunks.poll();
|
||||
|
@ -245,7 +257,7 @@ public class ChunkCreator {
|
|||
long created = this.chunkCount.get();
|
||||
if (created < this.maxCount) {
|
||||
if (this.chunkCount.compareAndSet(created, created + 1)) {
|
||||
chunk = createChunk(true);
|
||||
chunk = createChunkForPool();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
|
@ -259,21 +271,16 @@ public class ChunkCreator {
|
|||
/**
|
||||
* Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
|
||||
* chunks
|
||||
* @param chunks
|
||||
* @param c
|
||||
*/
|
||||
private void putbackChunks(Set<Integer> chunks) {
|
||||
int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
|
||||
Iterator<Integer> iterator = chunks.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Integer chunkId = iterator.next();
|
||||
// remove the chunks every time though they are from the pool or not
|
||||
Chunk chunk = ChunkCreator.this.removeChunk(chunkId);
|
||||
if (chunk != null) {
|
||||
if (chunk.isFromPool() && toAdd > 0) {
|
||||
reclaimedChunks.add(chunk);
|
||||
}
|
||||
toAdd--;
|
||||
}
|
||||
private void putbackChunks(Chunk c) {
|
||||
int toAdd = this.maxCount - reclaimedChunks.size();
|
||||
if (c.isFromPool() && toAdd > 0) {
|
||||
reclaimedChunks.add(c);
|
||||
} else {
|
||||
// remove the chunk (that is not going to pool)
|
||||
// though it is initially from the pool or not
|
||||
ChunkCreator.this.removeChunk(c.getId());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -384,6 +391,20 @@ public class ChunkCreator {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isChunkInPool(int chunkId) {
|
||||
if (pool != null) {
|
||||
// chunks that are from pool will return true chunk reference not null
|
||||
Chunk c = getChunk(chunkId);
|
||||
if (c==null) {
|
||||
return false;
|
||||
}
|
||||
return pool.reclaimedChunks.contains(c);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Only used in testing
|
||||
*/
|
||||
|
@ -395,10 +416,24 @@ public class ChunkCreator {
|
|||
}
|
||||
|
||||
synchronized void putbackChunks(Set<Integer> chunks) {
|
||||
if (pool != null) {
|
||||
pool.putbackChunks(chunks);
|
||||
} else {
|
||||
// if there is no pool just try to clear the chunkIdMap in case there is something
|
||||
if ( pool == null ) {
|
||||
this.removeChunks(chunks);
|
||||
return;
|
||||
}
|
||||
|
||||
// if there is pool, go over all chunk IDs that came back, the chunks may be from pool or not
|
||||
for (int chunkID : chunks) {
|
||||
// translate chunk ID to chunk, if chunk initially wasn't in pool
|
||||
// this translation will (most likely) return null
|
||||
Chunk chunk = ChunkCreator.this.getChunk(chunkID);
|
||||
if (chunk != null) {
|
||||
pool.putbackChunks(chunk);
|
||||
}
|
||||
// if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also),
|
||||
// so we have nothing to do on its release
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -57,6 +57,12 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
"hbase.hregion.compacting.memstore.type";
|
||||
public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
|
||||
String.valueOf(MemoryCompactionPolicy.BASIC);
|
||||
// The external setting of the compacting MemStore behaviour
|
||||
public static final String COMPACTING_MEMSTORE_INDEX_KEY =
|
||||
"hbase.hregion.compacting.memstore.index";
|
||||
// usage of CellArrayMap is default, later it will be decided how to use CellChunkMap
|
||||
public static final String COMPACTING_MEMSTORE_INDEX_DEFAULT =
|
||||
String.valueOf(IndexType.ARRAY_MAP);
|
||||
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size
|
||||
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
|
||||
"hbase.memstore.inmemoryflush.threshold.factor";
|
||||
|
@ -78,10 +84,22 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
|
||||
private boolean compositeSnapshot = true;
|
||||
|
||||
/**
|
||||
* Types of indexes (part of immutable segments) to be used after flattening,
|
||||
* compaction, or merge are applied.
|
||||
*/
|
||||
public enum IndexType {
|
||||
CSLM_MAP, // ConcurrentSkipLisMap
|
||||
ARRAY_MAP, // CellArrayMap
|
||||
CHUNK_MAP // CellChunkMap
|
||||
}
|
||||
|
||||
private IndexType indexType = IndexType.ARRAY_MAP; // default implementation
|
||||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
|
||||
+ 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
|
||||
// MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
|
||||
+ 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
|
||||
// MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
|
||||
// indexType
|
||||
+ Bytes.SIZEOF_LONG // inmemoryFlushSize
|
||||
+ 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay
|
||||
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
|
||||
|
@ -96,6 +114,8 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
this.pipeline = new CompactionPipeline(getRegionServices());
|
||||
this.compactor = createMemStoreCompactor(compactionPolicy);
|
||||
initInmemoryFlushSize(conf);
|
||||
indexType = IndexType.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
|
||||
CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -256,6 +256,8 @@ public class CompactionPipeline {
|
|||
|
||||
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
|
||||
boolean closeSegmentsInSuffix) {
|
||||
pipeline.removeAll(suffix);
|
||||
if(segment != null) pipeline.addLast(segment);
|
||||
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
|
||||
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
|
||||
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
|
||||
|
@ -267,8 +269,6 @@ public class CompactionPipeline {
|
|||
itemInSuffix.close();
|
||||
}
|
||||
}
|
||||
pipeline.removeAll(suffix);
|
||||
if(segment != null) pipeline.addLast(segment);
|
||||
}
|
||||
|
||||
public Segment getTail() {
|
||||
|
|
|
@ -269,4 +269,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
}
|
||||
return pooledChunks;
|
||||
}
|
||||
|
||||
@VisibleForTesting Integer getNumOfChunksReturnedToPool() {
|
||||
int i = 0;
|
||||
for (Integer id : this.chunks) {
|
||||
if (chunkCreator.isChunkInPool(id)) {
|
||||
i++;
|
||||
}
|
||||
}
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -247,14 +247,16 @@ public class TestMemStoreLAB {
|
|||
}
|
||||
}
|
||||
// none of the chunkIds would have been returned back
|
||||
assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0);
|
||||
assertTrue("All the chunks must have been cleared",
|
||||
ChunkCreator.INSTANCE.numberOfMappedChunks() != 0);
|
||||
int pooledChunksNum = mslab.getPooledChunks().size();
|
||||
// close the mslab
|
||||
mslab.close();
|
||||
// make sure all chunks reclaimed or removed from chunk queue
|
||||
int queueLength = mslab.getPooledChunks().size();
|
||||
// make sure all chunks where reclaimed back to pool
|
||||
int queueLength = mslab.getNumOfChunksReturnedToPool();
|
||||
assertTrue("All chunks in chunk queue should be reclaimed or removed"
|
||||
+ " after mslab closed but actually: " + queueLength,
|
||||
queueLength == 0);
|
||||
+ " after mslab closed but actually: " + (pooledChunksNum-queueLength),
|
||||
pooledChunksNum-queueLength == 0);
|
||||
} finally {
|
||||
ChunkCreator.INSTANCE = oldInstance;
|
||||
}
|
||||
|
|
|
@ -140,7 +140,8 @@ public class TestMemstoreLABWithoutPool {
|
|||
mslab[i].close();
|
||||
}
|
||||
// all of the chunkIds would have been returned back
|
||||
assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0);
|
||||
assertTrue("All the chunks must have been cleared",
|
||||
ChunkCreator.INSTANCE.numberOfMappedChunks() == 0);
|
||||
}
|
||||
|
||||
private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
|
||||
|
|
Loading…
Reference in New Issue