HBASE-18375: Fix the bug where the pool chunks from ChunkCreator are deallocated and not returned to pool, because there is no reference to them
This commit is contained in:
parent
092dc6de84
commit
75a6b36849
|
@ -176,10 +176,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
||||||
private int createCellReference(ByteBufferKeyValue cell, ByteBuffer idxBuffer, int idxOffset) {
|
private int createCellReference(ByteBufferKeyValue cell, ByteBuffer idxBuffer, int idxOffset) {
|
||||||
int offset = idxOffset;
|
int offset = idxOffset;
|
||||||
int dataChunkID = cell.getChunkId();
|
int dataChunkID = cell.getChunkId();
|
||||||
// ensure strong pointer to data chunk, as index is no longer directly points to it
|
|
||||||
Chunk c = ChunkCreator.getInstance().saveChunkFromGC(dataChunkID);
|
|
||||||
// if c is null, it means that this cell chunks was already released shouldn't happen
|
|
||||||
assert (c!=null);
|
|
||||||
offset = ByteBufferUtils.putInt(idxBuffer, offset, dataChunkID); // write data chunk id
|
offset = ByteBufferUtils.putInt(idxBuffer, offset, dataChunkID); // write data chunk id
|
||||||
offset = ByteBufferUtils.putInt(idxBuffer, offset, cell.getOffset()); // offset
|
offset = ByteBufferUtils.putInt(idxBuffer, offset, cell.getOffset()); // offset
|
||||||
offset = ByteBufferUtils.putInt(idxBuffer, offset, KeyValueUtil.length(cell)); // length
|
offset = ByteBufferUtils.putInt(idxBuffer, offset, KeyValueUtil.length(cell)); // length
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.lang.ref.WeakReference;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -58,21 +57,8 @@ public class ChunkCreator {
|
||||||
// the header size need to be changed in case chunk id size is changed
|
// the header size need to be changed in case chunk id size is changed
|
||||||
public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT;
|
public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT;
|
||||||
|
|
||||||
// An object pointed by a weak reference can be garbage collected, in opposite to an object
|
// mapping from chunk IDs to chunks
|
||||||
// referenced by a strong (regular) reference. Every chunk created via ChunkCreator is referenced
|
private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
|
||||||
// from either weakChunkIdMap or strongChunkIdMap.
|
|
||||||
// Upon chunk C creation, C's ID is mapped into weak reference to C, in order not to disturb C's
|
|
||||||
// GC in case all other reference to C are going to be removed.
|
|
||||||
// When chunk C is referenced from CellChunkMap (via C's ID) it is possible to GC the chunk C.
|
|
||||||
// To avoid that upon inserting C into CellChunkMap, C's ID is mapped into strong (regular)
|
|
||||||
// reference to C.
|
|
||||||
|
|
||||||
// map that doesn't influence GC
|
|
||||||
private Map<Integer, WeakReference<Chunk>> weakChunkIdMap =
|
|
||||||
new ConcurrentHashMap<Integer, WeakReference<Chunk>>();
|
|
||||||
|
|
||||||
// map that keeps chunks from garbage collection
|
|
||||||
private Map<Integer, Chunk> strongChunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
|
|
||||||
|
|
||||||
private final int chunkSize;
|
private final int chunkSize;
|
||||||
private final boolean offheap;
|
private final boolean offheap;
|
||||||
|
@ -95,7 +81,7 @@ public class ChunkCreator {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the instance of MSLABChunkCreator
|
* Initializes the instance of ChunkCreator
|
||||||
* @param chunkSize the chunkSize
|
* @param chunkSize the chunkSize
|
||||||
* @param offheap indicates if the chunk is to be created offheap or not
|
* @param offheap indicates if the chunk is to be created offheap or not
|
||||||
* @param globalMemStoreSize the global memstore size
|
* @param globalMemStoreSize the global memstore size
|
||||||
|
@ -120,10 +106,19 @@ public class ChunkCreator {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates and inits a chunk.
|
* Creates and inits a chunk. The default implementation.
|
||||||
* @return the chunk that was initialized
|
* @return the chunk that was initialized
|
||||||
*/
|
*/
|
||||||
Chunk getChunk() {
|
Chunk getChunk() {
|
||||||
|
return getChunk(CompactingMemStore.IndexType.ARRAY_MAP);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates and inits a chunk.
|
||||||
|
* @return the chunk that was initialized
|
||||||
|
* @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
|
||||||
|
*/
|
||||||
|
Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
|
||||||
Chunk chunk = null;
|
Chunk chunk = null;
|
||||||
if (pool != null) {
|
if (pool != null) {
|
||||||
// the pool creates the chunk internally. The chunk#init() call happens here
|
// the pool creates the chunk internally. The chunk#init() call happens here
|
||||||
|
@ -137,68 +132,45 @@ public class ChunkCreator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (chunk == null) {
|
if (chunk == null) {
|
||||||
chunk = createChunk();
|
// the second boolean parameter means:
|
||||||
|
// if CellChunkMap index is requested, put allocated on demand chunk mapping into chunkIdMap
|
||||||
|
chunk = createChunk(false, chunkIndexType);
|
||||||
}
|
}
|
||||||
// put this chunk initially into the weakChunkIdMap
|
|
||||||
this.weakChunkIdMap.put(chunk.getId(), new WeakReference<>(chunk));
|
|
||||||
// now we need to actually do the expensive memory allocation step in case of a new chunk,
|
// now we need to actually do the expensive memory allocation step in case of a new chunk,
|
||||||
// else only the offset is set to the beginning of the chunk to accept allocations
|
// else only the offset is set to the beginning of the chunk to accept allocations
|
||||||
chunk.init();
|
chunk.init();
|
||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Chunk createChunk() {
|
|
||||||
return createChunk(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates the chunk either onheap or offheap
|
* Creates the chunk either onheap or offheap
|
||||||
* @param pool indicates if the chunks have to be created which will be used by the Pool
|
* @param pool indicates if the chunks have to be created which will be used by the Pool
|
||||||
|
* @param chunkIndexType
|
||||||
* @return the chunk
|
* @return the chunk
|
||||||
*/
|
*/
|
||||||
private Chunk createChunk(boolean pool) {
|
private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType) {
|
||||||
|
Chunk chunk = null;
|
||||||
int id = chunkID.getAndIncrement();
|
int id = chunkID.getAndIncrement();
|
||||||
assert id > 0;
|
assert id > 0;
|
||||||
// do not create offheap chunk on demand
|
// do not create offheap chunk on demand
|
||||||
if (pool && this.offheap) {
|
if (pool && this.offheap) {
|
||||||
return new OffheapChunk(chunkSize, id, pool);
|
chunk = new OffheapChunk(chunkSize, id, pool);
|
||||||
} else {
|
} else {
|
||||||
return new OnheapChunk(chunkSize, id, pool);
|
chunk = new OnheapChunk(chunkSize, id, pool);
|
||||||
}
|
}
|
||||||
|
if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) {
|
||||||
|
// put the pool chunk into the chunkIdMap so it is not GC-ed
|
||||||
|
this.chunkIdMap.put(chunk.getId(), chunk);
|
||||||
|
}
|
||||||
|
return chunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
// Used to translate the ChunkID into a chunk ref
|
// Used to translate the ChunkID into a chunk ref
|
||||||
Chunk getChunk(int id) {
|
Chunk getChunk(int id) {
|
||||||
WeakReference<Chunk> ref = weakChunkIdMap.get(id);
|
// can return null if chunk was never mapped
|
||||||
if (ref != null) {
|
return chunkIdMap.get(id);
|
||||||
return ref.get();
|
|
||||||
}
|
|
||||||
// check also the strong mapping
|
|
||||||
return strongChunkIdMap.get(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// transfer the weak pointer to be a strong chunk pointer
|
|
||||||
Chunk saveChunkFromGC(int chunkID) {
|
|
||||||
Chunk c = strongChunkIdMap.get(chunkID); // check whether the chunk is already protected
|
|
||||||
if (c != null) // with strong pointer
|
|
||||||
return c;
|
|
||||||
WeakReference<Chunk> ref = weakChunkIdMap.get(chunkID);
|
|
||||||
if (ref != null) {
|
|
||||||
c = ref.get();
|
|
||||||
}
|
|
||||||
if (c != null) {
|
|
||||||
// put this strong reference to chunk into the strongChunkIdMap
|
|
||||||
// the read of the weakMap is always happening before the read of the strongMap
|
|
||||||
// so no synchronization issues here
|
|
||||||
this.strongChunkIdMap.put(chunkID, c);
|
|
||||||
this.weakChunkIdMap.remove(chunkID);
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
// we should actually never return null as someone should not ask to save from GC a chunk,
|
|
||||||
// which is already released. However, we are not asserting it here and we let the caller
|
|
||||||
// to deal with the return value an assert if needed
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int getChunkSize() {
|
int getChunkSize() {
|
||||||
|
@ -210,30 +182,23 @@ public class ChunkCreator {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeChunks(Set<Integer> chunkIDs) {
|
private void removeChunks(Set<Integer> chunkIDs) {
|
||||||
this.weakChunkIdMap.keySet().removeAll(chunkIDs);
|
this.chunkIdMap.keySet().removeAll(chunkIDs);
|
||||||
this.strongChunkIdMap.keySet().removeAll(chunkIDs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Chunk removeChunk(int chunkId) {
|
Chunk removeChunk(int chunkId) {
|
||||||
WeakReference<Chunk> weak = this.weakChunkIdMap.remove(chunkId);
|
return this.chunkIdMap.remove(chunkId);
|
||||||
Chunk strong = this.strongChunkIdMap.remove(chunkId);
|
|
||||||
if (weak != null) {
|
|
||||||
return weak.get();
|
|
||||||
}
|
|
||||||
return strong;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
// the chunks in the weakChunkIdMap may already be released so we shouldn't relay
|
// the chunks in the chunkIdMap may already be released so we shouldn't relay
|
||||||
// on this counting for strong correctness. This method is used only in testing.
|
// on this counting for strong correctness. This method is used only in testing.
|
||||||
int size() {
|
int numberOfMappedChunks() {
|
||||||
return this.weakChunkIdMap.size()+this.strongChunkIdMap.size();
|
return this.chunkIdMap.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void clearChunkIds() {
|
void clearChunkIds() {
|
||||||
this.strongChunkIdMap.clear();
|
this.chunkIdMap.clear();
|
||||||
this.weakChunkIdMap.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -262,7 +227,9 @@ public class ChunkCreator {
|
||||||
this.poolSizePercentage = poolSizePercentage;
|
this.poolSizePercentage = poolSizePercentage;
|
||||||
this.reclaimedChunks = new LinkedBlockingQueue<>();
|
this.reclaimedChunks = new LinkedBlockingQueue<>();
|
||||||
for (int i = 0; i < initialCount; i++) {
|
for (int i = 0; i < initialCount; i++) {
|
||||||
Chunk chunk = createChunk(true);
|
// Chunks from pool are covered with strong references anyway
|
||||||
|
// TODO: change to CHUNK_MAP if it is generally defined
|
||||||
|
Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
|
||||||
chunk.init();
|
chunk.init();
|
||||||
reclaimedChunks.add(chunk);
|
reclaimedChunks.add(chunk);
|
||||||
}
|
}
|
||||||
|
@ -281,7 +248,7 @@ public class ChunkCreator {
|
||||||
* then.
|
* then.
|
||||||
* Note: Chunks returned by this pool must be put back to the pool after its use.
|
* Note: Chunks returned by this pool must be put back to the pool after its use.
|
||||||
* @return a chunk
|
* @return a chunk
|
||||||
* @see #putbackChunks(Set)
|
* @see #putbackChunks(Chunk)
|
||||||
*/
|
*/
|
||||||
Chunk getChunk() {
|
Chunk getChunk() {
|
||||||
Chunk chunk = reclaimedChunks.poll();
|
Chunk chunk = reclaimedChunks.poll();
|
||||||
|
@ -294,7 +261,8 @@ public class ChunkCreator {
|
||||||
long created = this.chunkCount.get();
|
long created = this.chunkCount.get();
|
||||||
if (created < this.maxCount) {
|
if (created < this.maxCount) {
|
||||||
if (this.chunkCount.compareAndSet(created, created + 1)) {
|
if (this.chunkCount.compareAndSet(created, created + 1)) {
|
||||||
chunk = createChunk(true);
|
// TODO: change to CHUNK_MAP if it is generally defined
|
||||||
|
chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -308,21 +276,16 @@ public class ChunkCreator {
|
||||||
/**
|
/**
|
||||||
* Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
|
* Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
|
||||||
* chunks
|
* chunks
|
||||||
* @param chunks
|
* @param c
|
||||||
*/
|
*/
|
||||||
private void putbackChunks(Set<Integer> chunks) {
|
private void putbackChunks(Chunk c) {
|
||||||
int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
|
int toAdd = this.maxCount - reclaimedChunks.size();
|
||||||
Iterator<Integer> iterator = chunks.iterator();
|
if (c.isFromPool() && toAdd > 0) {
|
||||||
while (iterator.hasNext()) {
|
reclaimedChunks.add(c);
|
||||||
Integer chunkId = iterator.next();
|
} else {
|
||||||
// remove the chunks every time though they are from the pool or not
|
// remove the chunk (that is not going to pool)
|
||||||
Chunk chunk = ChunkCreator.this.removeChunk(chunkId);
|
// though it is initially from the pool or not
|
||||||
if (chunk != null) {
|
ChunkCreator.this.removeChunk(c.getId());
|
||||||
if (chunk.isFromPool() && toAdd > 0) {
|
|
||||||
reclaimedChunks.add(chunk);
|
|
||||||
}
|
|
||||||
toAdd--;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,6 +396,20 @@ public class ChunkCreator {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean isChunkInPool(int chunkId) {
|
||||||
|
if (pool != null) {
|
||||||
|
// chunks that are from pool will return true chunk reference not null
|
||||||
|
Chunk c = getChunk(chunkId);
|
||||||
|
if (c==null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return pool.reclaimedChunks.contains(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Only used in testing
|
* Only used in testing
|
||||||
*/
|
*/
|
||||||
|
@ -444,10 +421,24 @@ public class ChunkCreator {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void putbackChunks(Set<Integer> chunks) {
|
synchronized void putbackChunks(Set<Integer> chunks) {
|
||||||
if (pool != null) {
|
// if there is no pool just try to clear the chunkIdMap in case there is something
|
||||||
pool.putbackChunks(chunks);
|
if ( pool == null ) {
|
||||||
} else {
|
|
||||||
this.removeChunks(chunks);
|
this.removeChunks(chunks);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if there is pool, go over all chunk IDs that came back, the chunks may be from pool or not
|
||||||
|
for (int chunkID : chunks) {
|
||||||
|
// translate chunk ID to chunk, if chunk initially wasn't in pool
|
||||||
|
// this translation will (most likely) return null
|
||||||
|
Chunk chunk = ChunkCreator.this.getChunk(chunkID);
|
||||||
|
if (chunk != null) {
|
||||||
|
pool.putbackChunks(chunk);
|
||||||
|
}
|
||||||
|
// if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also),
|
||||||
|
// so we have nothing to do on its release
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,10 +43,10 @@ import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
* suffix of the pipeline.
|
* suffix of the pipeline.
|
||||||
*
|
*
|
||||||
* The synchronization model is copy-on-write. Methods which change the structure of the
|
* The synchronization model is copy-on-write. Methods which change the structure of the
|
||||||
* pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make
|
* pipeline (pushHead(), flattenOneSegment() and swap()) apply their changes in the context of a
|
||||||
* a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read
|
* lock. They also make a read-only copy of the pipeline's list. Read methods read from a
|
||||||
* method accesses the read-only copy more than once it makes a local copy of it
|
* read-only copy. If a read method accesses the read-only copy more than once it makes a local
|
||||||
* to ensure it accesses the same copy.
|
* copy of it to ensure it accesses the same copy.
|
||||||
*
|
*
|
||||||
* The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also
|
* The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also
|
||||||
* protected by a lock since they need to have a consistent (atomic) view of the pipeline list
|
* protected by a lock since they need to have a consistent (atomic) view of the pipeline list
|
||||||
|
@ -261,6 +261,8 @@ public class CompactionPipeline {
|
||||||
|
|
||||||
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
|
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
|
||||||
boolean closeSegmentsInSuffix) {
|
boolean closeSegmentsInSuffix) {
|
||||||
|
pipeline.removeAll(suffix);
|
||||||
|
if(segment != null) pipeline.addLast(segment);
|
||||||
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
|
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
|
||||||
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
|
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
|
||||||
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
|
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
|
||||||
|
@ -272,15 +274,20 @@ public class CompactionPipeline {
|
||||||
itemInSuffix.close();
|
itemInSuffix.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pipeline.removeAll(suffix);
|
|
||||||
if(segment != null) pipeline.addLast(segment);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// replacing one segment in the pipeline with a new one exactly at the same index
|
// replacing one segment in the pipeline with a new one exactly at the same index
|
||||||
// need to be called only within synchronized block
|
// need to be called only within synchronized block
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
|
||||||
|
justification="replaceAtIndex is invoked under a synchronize block so safe")
|
||||||
private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
|
private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
|
||||||
pipeline.set(idx, newSegment);
|
pipeline.set(idx, newSegment);
|
||||||
readOnlyCopy = new LinkedList<>(pipeline);
|
readOnlyCopy = new LinkedList<>(pipeline);
|
||||||
|
// the version increment is indeed needed, because the swap uses removeAll() method of the
|
||||||
|
// linked-list that compares the objects to find what to remove.
|
||||||
|
// The flattening changes the segment object completely (creation pattern) and so
|
||||||
|
// swap will not proceed correctly after concurrent flattening.
|
||||||
|
version++;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Segment getTail() {
|
public Segment getTail() {
|
||||||
|
|
|
@ -78,6 +78,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
||||||
private final int chunkSize;
|
private final int chunkSize;
|
||||||
private final int maxAlloc;
|
private final int maxAlloc;
|
||||||
private final ChunkCreator chunkCreator;
|
private final ChunkCreator chunkCreator;
|
||||||
|
private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment
|
||||||
|
|
||||||
// This flag is for closing this instance, its set when clearing snapshot of
|
// This flag is for closing this instance, its set when clearing snapshot of
|
||||||
// memstore
|
// memstore
|
||||||
|
@ -100,6 +101,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
||||||
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
|
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
|
||||||
Preconditions.checkArgument(maxAlloc <= chunkSize,
|
Preconditions.checkArgument(maxAlloc <= chunkSize,
|
||||||
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
|
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
|
||||||
|
idxType = CompactingMemStore.IndexType.valueOf(conf.get(
|
||||||
|
CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
|
||||||
|
CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -239,7 +243,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
||||||
if (c != null) {
|
if (c != null) {
|
||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
c = this.chunkCreator.getChunk();
|
c = this.chunkCreator.getChunk(idxType);
|
||||||
if (c != null) {
|
if (c != null) {
|
||||||
// set the curChunk. No need of CAS as only one thread will be here
|
// set the curChunk. No need of CAS as only one thread will be here
|
||||||
curChunk.set(c);
|
curChunk.set(c);
|
||||||
|
@ -253,12 +257,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returning a new chunk, without replacing current chunk,
|
/* Creating chunk to be used as index chunk in CellChunkMap, part of the chunks array.
|
||||||
// meaning MSLABImpl does not make the returned chunk as CurChunk.
|
** Returning a new chunk, without replacing current chunk,
|
||||||
// The space on this chunk will be allocated externally
|
** meaning MSLABImpl does not make the returned chunk as CurChunk.
|
||||||
// The interface is only for external callers
|
** The space on this chunk will be allocated externally.
|
||||||
|
** The interface is only for external callers
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Chunk getNewExternalChunk() {
|
public Chunk getNewExternalChunk() {
|
||||||
|
// the new chunk is going to be part of the chunk array and will always be referenced
|
||||||
Chunk c = this.chunkCreator.getChunk();
|
Chunk c = this.chunkCreator.getChunk();
|
||||||
chunks.add(c.getId());
|
chunks.add(c.getId());
|
||||||
return c;
|
return c;
|
||||||
|
@ -280,4 +287,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
||||||
}
|
}
|
||||||
return pooledChunks;
|
return pooledChunks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting Integer getNumOfChunksReturnedToPool() {
|
||||||
|
int i = 0;
|
||||||
|
for (Integer id : this.chunks) {
|
||||||
|
if (chunkCreator.isChunkInPool(id)) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return i;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -247,14 +247,16 @@ public class TestMemStoreLAB {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// none of the chunkIds would have been returned back
|
// none of the chunkIds would have been returned back
|
||||||
assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0);
|
assertTrue("All the chunks must have been cleared",
|
||||||
|
ChunkCreator.INSTANCE.numberOfMappedChunks() != 0);
|
||||||
|
int pooledChunksNum = mslab.getPooledChunks().size();
|
||||||
// close the mslab
|
// close the mslab
|
||||||
mslab.close();
|
mslab.close();
|
||||||
// make sure all chunks reclaimed or removed from chunk queue
|
// make sure all chunks where reclaimed back to pool
|
||||||
int queueLength = mslab.getPooledChunks().size();
|
int queueLength = mslab.getNumOfChunksReturnedToPool();
|
||||||
assertTrue("All chunks in chunk queue should be reclaimed or removed"
|
assertTrue("All chunks in chunk queue should be reclaimed or removed"
|
||||||
+ " after mslab closed but actually: " + queueLength,
|
+ " after mslab closed but actually: " + (pooledChunksNum-queueLength),
|
||||||
queueLength == 0);
|
pooledChunksNum-queueLength == 0);
|
||||||
} finally {
|
} finally {
|
||||||
ChunkCreator.INSTANCE = oldInstance;
|
ChunkCreator.INSTANCE = oldInstance;
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,7 +140,8 @@ public class TestMemstoreLABWithoutPool {
|
||||||
mslab[i].close();
|
mslab[i].close();
|
||||||
}
|
}
|
||||||
// all of the chunkIds would have been returned back
|
// all of the chunkIds would have been returned back
|
||||||
assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0);
|
assertTrue("All the chunks must have been cleared",
|
||||||
|
ChunkCreator.INSTANCE.numberOfMappedChunks() == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
|
private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
|
||||||
|
|
Loading…
Reference in New Issue