HBASE-16438 Create a cell type so that chunk id is embedded in it (Ram)

This commit is contained in:
Ramkrishna 2017-04-17 09:10:59 +05:30
parent c8cd921bed
commit c2c2178b2e
41 changed files with 993 additions and 482 deletions

View File

@ -3135,28 +3135,4 @@ public final class CellUtil {
return Type.DeleteFamily.getCode();
}
}
/**
* Clone the passed cell by copying its data into the passed buf.
*/
public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) {
int tagsLen = cell.getTagsLength();
if (cell instanceof ExtendedCell) {
((ExtendedCell) cell).write(buf, offset);
} else {
// Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
// other case also. The data fragments within Cell is copied into buf as in KeyValue
// serialization format only.
KeyValueUtil.appendTo(cell, buf, offset, true);
}
if (tagsLen == 0) {
// When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
// which directly return tagsLen as 0. So we avoid parsing many length components in
// reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
// call getTagsLength().
return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
} else {
return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize,
Cloneable {
public static int CELL_NOT_BASED_ON_CHUNK = -1;
/**
* Write this cell to an OutputStream in a {@link KeyValue} format.
* <br> KeyValue format <br>
@ -73,4 +74,13 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
* @return The deep cloned cell
*/
Cell deepClone();
/**
* Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized
* chunks as in case of MemstoreLAB
* @return the chunk id if the cell is backed by fixed sized Chunks, else return -1
*/
default int getChunkId() {
return CELL_NOT_BASED_ON_CHUNK;
}
}

View File

@ -748,6 +748,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.masterActiveTime = System.currentTimeMillis();
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
// Initialize the chunkCreator
initializeMemStoreChunkCreator();
this.fileSystemManager = new MasterFileSystem(this);
this.walManager = new MasterWalManager(this);

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
/**
* ByteBuffer based cell which has the chunkid at the 0th offset
* @see MemStoreLAB
*/
//TODO : When moving this cell to CellChunkMap we will have the following things
// to be serialized
// chunkId (Integer) + offset (Integer) + length (Integer) + seqId (Long) = 20 bytes
@InterfaceAudience.Private
public class ByteBufferChunkCell extends ByteBufferKeyValue {
public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
super(buf, offset, length);
}
public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
super(buf, offset, length, seqId);
}
@Override
public int getChunkId() {
// The chunkId is embedded at the 0th offset of the bytebuffer
return ByteBufferUtils.toInt(buf, 0);
}
}

View File

@ -21,8 +21,10 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* A chunk of memory out of which allocations are sliced.
@ -46,13 +48,41 @@ public abstract class Chunk {
/** Size of chunk in bytes */
protected final int size;
// The unique id associated with the chunk.
private final int id;
// indicates if the chunk is formed by ChunkCreator#MemstorePool
private final boolean fromPool;
/**
* Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap.
*
* Create an uninitialized chunk. Note that memory is not allocated yet, so
* this is cheap.
* @param size in bytes
* @param id the chunk id
*/
Chunk(int size) {
public Chunk(int size, int id) {
this(size, id, false);
}
/**
* Create an uninitialized chunk. Note that memory is not allocated yet, so
* this is cheap.
* @param size in bytes
* @param id the chunk id
* @param fromPool if the chunk is formed by pool
*/
public Chunk(int size, int id, boolean fromPool) {
this.size = size;
this.id = id;
this.fromPool = fromPool;
}
int getId() {
return this.id;
}
boolean isFromPool() {
return this.fromPool;
}
/**
@ -60,7 +90,24 @@ public abstract class Chunk {
* constructed the chunk. It is thread-safe against other threads calling alloc(), who will block
* until the allocation is complete.
*/
public abstract void init();
public void init() {
assert nextFreeOffset.get() == UNINITIALIZED;
try {
allocateDataBuffer();
} catch (OutOfMemoryError e) {
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
assert failInit; // should be true.
throw e;
}
// Mark that it's ready for use
// Move 8 bytes since the first 8 bytes are having the chunkid in it
boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG);
// We should always succeed the above CAS since only one thread
// calls init()!
Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
}
abstract void allocateDataBuffer();
/**
* Reset the offset to UNINITIALIZED before before reusing an old chunk
@ -74,7 +121,8 @@ public abstract class Chunk {
/**
* Try to allocate <code>size</code> bytes from the chunk.
*
* If a chunk is tried to get allocated before init() call, the thread doing the allocation
* will be in busy-wait state as it will keep looping till the nextFreeOffset is set.
* @return the offset of the successful allocation, or -1 to indicate not-enough-space
*/
public int alloc(int size) {
@ -96,7 +144,7 @@ public abstract class Chunk {
if (oldOffset + size > data.capacity()) {
return -1; // alloc doesn't fit
}
// TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset
// Try to atomically claim this chunk
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
// we got the alloc

View File

@ -0,0 +1,404 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.lang.ref.SoftReference;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated
* with every chunk
*/
@InterfaceAudience.Private
public class ChunkCreator {
private final Log LOG = LogFactory.getLog(ChunkCreator.class);
// monotonically increasing chunkid
private AtomicInteger chunkID = new AtomicInteger(1);
// maps the chunk against the monotonically increasing chunk id. We need to preserve the
// natural ordering of the key
// CellChunkMap creation should convert the soft ref to hard reference
private Map<Integer, SoftReference<Chunk>> chunkIdMap =
new ConcurrentHashMap<Integer, SoftReference<Chunk>>();
private final int chunkSize;
private final boolean offheap;
@VisibleForTesting
static ChunkCreator INSTANCE;
@VisibleForTesting
static boolean chunkPoolDisabled = false;
private MemStoreChunkPool pool;
@VisibleForTesting
ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage,
float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
this.chunkSize = chunkSize;
this.offheap = offheap;
this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage);
if (heapMemoryManager != null && this.pool != null) {
// Register with Heap Memory manager
heapMemoryManager.registerTuneObserver(this.pool);
}
}
/**
* Initializes the instance of MSLABChunkCreator
* @param chunkSize the chunkSize
* @param offheap indicates if the chunk is to be created offheap or not
* @param globalMemStoreSize the global memstore size
* @param poolSizePercentage pool size percentage
* @param initialCountPercentage the initial count of the chunk pool if any
* @param heapMemoryManager the heapmemory manager
* @return singleton MSLABChunkCreator
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
justification = "Method is called by single thread at the starting of RS")
@VisibleForTesting
public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize,
float poolSizePercentage, float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
if (INSTANCE != null) return INSTANCE;
INSTANCE = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
initialCountPercentage, heapMemoryManager);
return INSTANCE;
}
static ChunkCreator getInstance() {
return INSTANCE;
}
/**
* Creates and inits a chunk.
* @return the chunk that was initialized
*/
Chunk getChunk() {
Chunk chunk = null;
if (pool != null) {
// the pool creates the chunk internally. The chunk#init() call happens here
chunk = this.pool.getChunk();
// the pool has run out of maxCount
if (chunk == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("The chunk pool is full. Reached maxCount= " + this.pool.getMaxCount()
+ ". Creating chunk onheap.");
}
}
}
if (chunk == null) {
chunk = createChunk();
}
// put this chunk into the chunkIdMap
this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk));
// now we need to actually do the expensive memory allocation step in case of a new chunk,
// else only the offset is set to the beginning of the chunk to accept allocations
chunk.init();
return chunk;
}
private Chunk createChunk() {
return createChunk(false);
}
/**
* Creates the chunk either onheap or offheap
* @param pool indicates if the chunks have to be created which will be used by the Pool
* @return the chunk
*/
private Chunk createChunk(boolean pool) {
int id = chunkID.getAndIncrement();
assert id > 0;
// do not create offheap chunk on demand
if (pool && this.offheap) {
return new OffheapChunk(chunkSize, id, pool);
} else {
return new OnheapChunk(chunkSize, id, pool);
}
}
@VisibleForTesting
// TODO : To be used by CellChunkMap
Chunk getChunk(int id) {
SoftReference<Chunk> ref = chunkIdMap.get(id);
if (ref != null) {
return ref.get();
}
return null;
}
int getChunkSize() {
return this.chunkSize;
}
boolean isOffheap() {
return this.offheap;
}
private void removeChunks(Set<Integer> chunkIDs) {
this.chunkIdMap.keySet().removeAll(chunkIDs);
}
Chunk removeChunk(int chunkId) {
SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId);
if (ref != null) {
return ref.get();
}
return null;
}
@VisibleForTesting
int size() {
return this.chunkIdMap.size();
}
@VisibleForTesting
void clearChunkIds() {
this.chunkIdMap.clear();
}
/**
* A pool of {@link Chunk} instances.
*
* MemStoreChunkPool caches a number of retired chunks for reusing, it could
* decrease allocating bytes when writing, thereby optimizing the garbage
* collection on JVM.
*/
private class MemStoreChunkPool implements HeapMemoryTuneObserver {
private int maxCount;
// A queue of reclaimed chunks
private final BlockingQueue<Chunk> reclaimedChunks;
private final float poolSizePercentage;
/** Statistics thread schedule pool */
private final ScheduledExecutorService scheduleThreadPool;
/** Statistics thread */
private static final int statThreadPeriod = 60 * 5;
private final AtomicLong chunkCount = new AtomicLong();
private final AtomicLong reusedChunkCount = new AtomicLong();
MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) {
this.maxCount = maxCount;
this.poolSizePercentage = poolSizePercentage;
this.reclaimedChunks = new LinkedBlockingQueue<>();
for (int i = 0; i < initialCount; i++) {
Chunk chunk = createChunk(true);
chunk.init();
reclaimedChunks.add(chunk);
}
chunkCount.set(initialCount);
final String n = Thread.currentThread().getName();
scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
statThreadPeriod, TimeUnit.SECONDS);
}
/**
* Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
* not yet created max allowed chunks count. When we have already created max allowed chunks and
* no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
* then.
* Note: Chunks returned by this pool must be put back to the pool after its use.
* @return a chunk
* @see #putbackChunks(Set)
*/
Chunk getChunk() {
Chunk chunk = reclaimedChunks.poll();
if (chunk != null) {
chunk.reset();
reusedChunkCount.incrementAndGet();
} else {
// Make a chunk iff we have not yet created the maxCount chunks
while (true) {
long created = this.chunkCount.get();
if (created < this.maxCount) {
if (this.chunkCount.compareAndSet(created, created + 1)) {
chunk = createChunk(true);
break;
}
} else {
break;
}
}
}
return chunk;
}
/**
* Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
* chunks
* @param chunks
*/
private void putbackChunks(Set<Integer> chunks) {
int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
Iterator<Integer> iterator = chunks.iterator();
while (iterator.hasNext()) {
Integer chunkId = iterator.next();
// remove the chunks every time though they are from the pool or not
Chunk chunk = ChunkCreator.this.removeChunk(chunkId);
if (chunk != null) {
if (chunk.isFromPool() && toAdd > 0) {
reclaimedChunks.add(chunk);
}
toAdd--;
}
}
}
private class StatisticsThread extends Thread {
StatisticsThread() {
super("MemStoreChunkPool.StatisticsThread");
setDaemon(true);
}
@Override
public void run() {
logStats();
}
private void logStats() {
if (!LOG.isDebugEnabled()) return;
long created = chunkCount.get();
long reused = reusedChunkCount.get();
long total = created + reused;
LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+ ",created chunk count=" + created
+ ",reused chunk count=" + reused
+ ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
(float) reused / (float) total, 2)));
}
}
private int getMaxCount() {
return this.maxCount;
}
@Override
public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
// don't do any tuning in case of offheap memstore
if (isOffheap()) {
LOG.warn("Not tuning the chunk pool as it is offheap");
return;
}
int newMaxCount =
(int) (newMemstoreSize * poolSizePercentage / getChunkSize());
if (newMaxCount != this.maxCount) {
// We need an adjustment in the chunks numbers
if (newMaxCount > this.maxCount) {
// Max chunks getting increased. Just change the variable. Later calls to getChunk() would
// create and add them to Q
LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
this.maxCount = newMaxCount;
} else {
// Max chunks getting decreased. We may need to clear off some of the pooled chunks now
// itself. If the extra chunks are serving already, do not pool those when we get them back
LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
this.maxCount = newMaxCount;
if (this.reclaimedChunks.size() > newMaxCount) {
synchronized (this) {
while (this.reclaimedChunks.size() > newMaxCount) {
this.reclaimedChunks.poll();
}
}
}
}
}
}
}
@VisibleForTesting
static void clearDisableFlag() {
chunkPoolDisabled = false;
}
private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage,
float initialCountPercentage) {
if (poolSizePercentage <= 0) {
LOG.info("PoolSizePercentage is less than 0. So not using pool");
return null;
}
if (chunkPoolDisabled) {
return null;
}
if (poolSizePercentage > 1.0) {
throw new IllegalArgumentException(
MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
}
int maxCount = (int) (globalMemStoreSize * poolSizePercentage / getChunkSize());
if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
throw new IllegalArgumentException(
MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
}
int initialCount = (int) (initialCountPercentage * maxCount);
LOG.info("Allocating MemStoreChunkPool with chunk size "
+ StringUtils.byteDesc(getChunkSize()) + ", max count " + maxCount
+ ", initial count " + initialCount);
return new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage);
}
@VisibleForTesting
int getMaxCount() {
if (pool != null) {
return pool.getMaxCount();
}
return 0;
}
@VisibleForTesting
int getPoolSize() {
if (pool != null) {
return pool.reclaimedChunks.size();
}
return 0;
}
/*
* Only used in testing
*/
@VisibleForTesting
void clearChunksInPool() {
if (pool != null) {
pool.reclaimedChunks.clear();
}
}
synchronized void putbackChunks(Set<Integer> chunks) {
if (pool != null) {
pool.putbackChunks(chunks);
} else {
this.removeChunks(chunks);
}
}
}

View File

@ -1472,7 +1472,7 @@ public class HRegionServer extends HasThread implements
startServiceThreads();
startHeapMemoryManager();
// Call it after starting HeapMemoryManager.
initializeMemStoreChunkPool();
initializeMemStoreChunkCreator();
LOG.info("Serving as " + this.serverName +
", RpcServer on " + rpcServices.isa +
", sessionid=0x" +
@ -1492,7 +1492,7 @@ public class HRegionServer extends HasThread implements
}
}
private void initializeMemStoreChunkPool() {
protected void initializeMemStoreChunkCreator() {
if (MemStoreLAB.isEnabled(conf)) {
// MSLAB is enabled. So initialize MemStoreChunkPool
// By this time, the MemstoreFlusher is already initialized. We can get the global limits from
@ -1506,12 +1506,10 @@ public class HRegionServer extends HasThread implements
float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage,
initialCountPercentage, chunkSize, offheap);
if (pool != null && this.hMemManager != null) {
// Register with Heap Memory manager
this.hMemManager.registerTuneObserver(pool);
}
// init the chunkCreator
ChunkCreator chunkCreator =
ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
initialCountPercentage, this.hMemManager);
}
}

View File

@ -1,265 +0,0 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A pool of {@link Chunk} instances.
*
* MemStoreChunkPool caches a number of retired chunks for reusing, it could
* decrease allocating bytes when writing, thereby optimizing the garbage
* collection on JVM.
*
* The pool instance is globally unique and could be obtained through
* {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)}
*
* {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
* bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
* when MemStore clearing snapshot for flush
*/
@SuppressWarnings("javadoc")
@InterfaceAudience.Private
public class MemStoreChunkPool implements HeapMemoryTuneObserver {
private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
// Static reference to the MemStoreChunkPool
static MemStoreChunkPool GLOBAL_INSTANCE;
/** Boolean whether we have disabled the memstore chunk pool entirely. */
static boolean chunkPoolDisabled = false;
private int maxCount;
// A queue of reclaimed chunks
private final BlockingQueue<Chunk> reclaimedChunks;
private final int chunkSize;
private final float poolSizePercentage;
/** Statistics thread schedule pool */
private final ScheduledExecutorService scheduleThreadPool;
/** Statistics thread */
private static final int statThreadPeriod = 60 * 5;
private final AtomicLong chunkCount = new AtomicLong();
private final AtomicLong reusedChunkCount = new AtomicLong();
private final boolean offheap;
MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage,
boolean offheap) {
this.maxCount = maxCount;
this.chunkSize = chunkSize;
this.poolSizePercentage = poolSizePercentage;
this.offheap = offheap;
this.reclaimedChunks = new LinkedBlockingQueue<>();
for (int i = 0; i < initialCount; i++) {
Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize);
chunk.init();
reclaimedChunks.add(chunk);
}
chunkCount.set(initialCount);
final String n = Thread.currentThread().getName();
scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
statThreadPeriod, TimeUnit.SECONDS);
}
/**
* Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
* not yet created max allowed chunks count. When we have already created max allowed chunks and
* no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
* then.
* Note: Chunks returned by this pool must be put back to the pool after its use.
* @return a chunk
* @see #putbackChunk(Chunk)
* @see #putbackChunks(BlockingQueue)
*/
Chunk getChunk() {
Chunk chunk = reclaimedChunks.poll();
if (chunk != null) {
chunk.reset();
reusedChunkCount.incrementAndGet();
} else {
// Make a chunk iff we have not yet created the maxCount chunks
while (true) {
long created = this.chunkCount.get();
if (created < this.maxCount) {
chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize);
if (this.chunkCount.compareAndSet(created, created + 1)) {
break;
}
} else {
break;
}
}
}
return chunk;
}
/**
* Add the chunks to the pool, when the pool achieves the max size, it will
* skip the remaining chunks
* @param chunks
*/
synchronized void putbackChunks(BlockingQueue<Chunk> chunks) {
int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
Chunk chunk = null;
while ((chunk = chunks.poll()) != null && toAdd > 0) {
reclaimedChunks.add(chunk);
toAdd--;
}
}
/**
* Add the chunk to the pool, if the pool has achieved the max size, it will
* skip it
* @param chunk
*/
synchronized void putbackChunk(Chunk chunk) {
if (reclaimedChunks.size() < this.maxCount) {
reclaimedChunks.add(chunk);
}
}
int getPoolSize() {
return this.reclaimedChunks.size();
}
/*
* Only used in testing
*/
void clearChunks() {
this.reclaimedChunks.clear();
}
private class StatisticsThread extends Thread {
StatisticsThread() {
super("MemStoreChunkPool.StatisticsThread");
setDaemon(true);
}
@Override
public void run() {
logStats();
}
private void logStats() {
if (!LOG.isDebugEnabled()) return;
long created = chunkCount.get();
long reused = reusedChunkCount.get();
long total = created + reused;
LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+ ",created chunk count=" + created
+ ",reused chunk count=" + reused
+ ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
(float) reused / (float) total, 2)));
}
}
/**
* @return the global MemStoreChunkPool instance
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
justification = "Method is called by single thread at the starting of RS")
static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage,
float initialCountPercentage, int chunkSize, boolean offheap) {
if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
if (chunkPoolDisabled) return null;
if (poolSizePercentage <= 0) {
chunkPoolDisabled = true;
return null;
}
if (poolSizePercentage > 1.0) {
throw new IllegalArgumentException(
MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
}
int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize);
if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
throw new IllegalArgumentException(
MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
}
int initialCount = (int) (initialCountPercentage * maxCount);
LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
+ ", max count " + maxCount + ", initial count " + initialCount);
GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage,
offheap);
return GLOBAL_INSTANCE;
}
/**
* @return The singleton instance of this pool.
*/
static MemStoreChunkPool getPool() {
return GLOBAL_INSTANCE;
}
int getMaxCount() {
return this.maxCount;
}
@VisibleForTesting
static void clearDisableFlag() {
chunkPoolDisabled = false;
}
@Override
public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
// don't do any tuning in case of offheap memstore
if (this.offheap) {
LOG.warn("Not tuning the chunk pool as it is offheap");
return;
}
int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);
if (newMaxCount != this.maxCount) {
// We need an adjustment in the chunks numbers
if (newMaxCount > this.maxCount) {
// Max chunks getting increased. Just change the variable. Later calls to getChunk() would
// create and add them to Q
LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
this.maxCount = newMaxCount;
} else {
// Max chunks getting decreased. We may need to clear off some of the pooled chunks now
// itself. If the extra chunks are serving already, do not pool those when we get them back
LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
this.maxCount = newMaxCount;
if (this.reclaimedChunks.size() > newMaxCount) {
synchronized (this) {
while (this.reclaimedChunks.size() > newMaxCount) {
this.reclaimedChunks.poll();
}
}
}
}
}
}
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
* <p>
* The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from
* and then doles it out to threads that request slices into the array. These chunks can get pooled
* as well. See {@link MemStoreChunkPool}.
* as well. See {@link ChunkCreator}.
* <p>
* The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all
* Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that
@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
* {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this
* cell's data and copies into this area and then recreate a Cell over this copied data.
* <p>
* @see MemStoreChunkPool
* @see ChunkCreator
*/
@InterfaceAudience.Private
public interface MemStoreLAB {

View File

@ -18,23 +18,26 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* A memstore-local allocation buffer.
* <p>
@ -55,8 +58,8 @@ import com.google.common.base.Preconditions;
* would provide a performance improvement - probably would speed up the
* Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
* anyway.
* The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}.
* When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks,
* The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}.
* When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks,
* which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
* always on heap backed.
*/
@ -66,14 +69,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class);
private AtomicReference<Chunk> curChunk = new AtomicReference<>();
// A queue of chunks from pool contained by this memstore LAB
// TODO: in the future, it would be better to have List implementation instead of Queue,
// as FIFO order is not so important here
// Lock to manage multiple handlers requesting for a chunk
private ReentrantLock lock = new ReentrantLock();
// A set of chunks contained by this memstore LAB
@VisibleForTesting
BlockingQueue<Chunk> pooledChunkQueue = null;
Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
private final int chunkSize;
private final int maxAlloc;
private final MemStoreChunkPool chunkPool;
private final ChunkCreator chunkCreator;
// This flag is for closing this instance, its set when clearing snapshot of
// memstore
@ -92,20 +96,12 @@ public class MemStoreLABImpl implements MemStoreLAB {
public MemStoreLABImpl(Configuration conf) {
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
this.chunkPool = MemStoreChunkPool.getPool();
// currently chunkQueue is only used for chunkPool
if (this.chunkPool != null) {
// set queue length to chunk pool max count to avoid keeping reference of
// too many non-reclaimable chunks
pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount());
}
this.chunkCreator = ChunkCreator.getInstance();
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
Preconditions.checkArgument(maxAlloc <= chunkSize,
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
}
@Override
public Cell copyCellInto(Cell cell) {
int size = KeyValueUtil.length(cell);
@ -118,19 +114,52 @@ public class MemStoreLABImpl implements MemStoreLAB {
Chunk c = null;
int allocOffset = 0;
while (true) {
// Try to get the chunk
c = getOrMakeChunk();
// we may get null because the some other thread succeeded in getting the lock
// and so the current thread has to try again to make its chunk or grab the chunk
// that the other thread created
// Try to allocate from this chunk
allocOffset = c.alloc(size);
if (allocOffset != -1) {
// We succeeded - this is the common case - small alloc
// from a big buffer
break;
if (c != null) {
allocOffset = c.alloc(size);
if (allocOffset != -1) {
// We succeeded - this is the common case - small alloc
// from a big buffer
break;
}
// not enough space!
// try to retire this chunk
tryRetireChunk(c);
}
// not enough space!
// try to retire this chunk
tryRetireChunk(c);
}
return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size);
return copyToChunkCell(cell, c.getData(), allocOffset, size);
}
/**
* Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
* out of it
*/
private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
int tagsLen = cell.getTagsLength();
if (cell instanceof ExtendedCell) {
((ExtendedCell) cell).write(buf, offset);
} else {
// Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
// other case also. The data fragments within Cell is copied into buf as in KeyValue
// serialization format only.
KeyValueUtil.appendTo(cell, buf, offset, true);
}
// TODO : write the seqid here. For writing seqId we should create a new cell type so
// that seqId is not used as the state
if (tagsLen == 0) {
// When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
// which directly return tagsLen as 0. So we avoid parsing many length components in
// reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
// call getTagsLength().
return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
} else {
return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
}
}
/**
@ -142,9 +171,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
this.closed = true;
// We could put back the chunks to pool for reusing only when there is no
// opening scanner which will read their data
if (chunkPool != null && openScannerCount.get() == 0
&& reclaimed.compareAndSet(false, true)) {
chunkPool.putbackChunks(this.pooledChunkQueue);
int count = openScannerCount.get();
if(count == 0) {
recycleChunks();
}
}
@ -162,9 +191,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
if (this.closed && chunkPool != null && count == 0
&& reclaimed.compareAndSet(false, true)) {
chunkPool.putbackChunks(this.pooledChunkQueue);
if (this.closed && count == 0) {
recycleChunks();
}
}
private void recycleChunks() {
if (reclaimed.compareAndSet(false, true)) {
chunkCreator.putbackChunks(chunks);
}
}
@ -190,45 +224,33 @@ public class MemStoreLABImpl implements MemStoreLAB {
* allocate a new one from the JVM.
*/
private Chunk getOrMakeChunk() {
while (true) {
// Try to get the chunk
Chunk c = curChunk.get();
if (c != null) {
return c;
}
// No current chunk, so we want to allocate one. We race
// against other allocators to CAS in an uninitialized chunk
// (which is cheap to allocate)
if (chunkPool != null) {
c = chunkPool.getChunk();
}
boolean pooledChunk = false;
if (c != null) {
// This is chunk from pool
pooledChunk = true;
} else {
c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap.
}
if (curChunk.compareAndSet(null, c)) {
// we won race - now we need to actually do the expensive
// allocation step
c.init();
if (pooledChunk) {
if (!this.closed && !this.pooledChunkQueue.offer(c)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+ pooledChunkQueue.size());
}
}
}
return c;
} else if (pooledChunk) {
chunkPool.putbackChunk(c);
}
// someone else won race - that's fine, we'll try to grab theirs
// in the next iteration of the loop.
// Try to get the chunk
Chunk c = curChunk.get();
if (c != null) {
return c;
}
// No current chunk, so we want to allocate one. We race
// against other allocators to CAS in an uninitialized chunk
// (which is cheap to allocate)
if (lock.tryLock()) {
try {
// once again check inside the lock
c = curChunk.get();
if (c != null) {
return c;
}
c = this.chunkCreator.getChunk();
if (c != null) {
// set the curChunk. No need of CAS as only one thread will be here
curChunk.set(c);
chunks.add(c.getId());
return c;
}
} finally {
lock.unlock();
}
}
return null;
}
@VisibleForTesting
@ -236,8 +258,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
return this.curChunk.get();
}
@VisibleForTesting
BlockingQueue<Chunk> getPooledChunks() {
return this.pooledChunkQueue;
BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
for (Integer id : this.chunks) {
Chunk chunk = chunkCreator.getChunk(id);
if (chunk != null && chunk.isFromPool()) {
pooledChunks.add(chunk);
}
}
return pooledChunks;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
/**
* ByteBuffer based cell which has the chunkid at the 0th offset and with no tags
* @see MemStoreLAB
*/
@InterfaceAudience.Private
public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue {
public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
super(buf, offset, length);
}
public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
super(buf, offset, length, seqId);
}
@Override
public int getChunkId() {
// The chunkId is embedded at the 0th offset of the bytebuffer
return ByteBufferUtils.toInt(buf, 0);
}
}

View File

@ -21,34 +21,27 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.base.Preconditions;
/**
* An off heap chunk implementation.
*/
@InterfaceAudience.Private
public class OffheapChunk extends Chunk {
OffheapChunk(int size) {
super(size);
OffheapChunk(int size, int id) {
// better if this is always created fromPool. This should not be called
super(size, id);
}
OffheapChunk(int size, int id, boolean fromPool) {
super(size, id, fromPool);
assert fromPool == true;
}
@Override
public void init() {
assert nextFreeOffset.get() == UNINITIALIZED;
try {
if (data == null) {
data = ByteBuffer.allocateDirect(this.size);
}
} catch (OutOfMemoryError e) {
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
assert failInit; // should be true.
throw e;
void allocateDataBuffer() {
if (data == null) {
data = ByteBuffer.allocateDirect(this.size);
data.putLong(0, this.getId());
}
// Mark that it's ready for use
boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
// We should always succeed the above CAS since only one thread
// calls init()!
Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
}
}

View File

@ -21,33 +21,25 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.base.Preconditions;
/**
* An on heap chunk implementation.
*/
@InterfaceAudience.Private
public class OnheapChunk extends Chunk {
OnheapChunk(int size) {
super(size);
OnheapChunk(int size, int id) {
super(size, id);
}
public void init() {
assert nextFreeOffset.get() == UNINITIALIZED;
try {
if (data == null) {
data = ByteBuffer.allocate(this.size);
}
} catch (OutOfMemoryError e) {
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
assert failInit; // should be true.
throw e;
}
// Mark that it's ready for use
boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
// We should always succeed the above CAS since only one thread
// calls init()!
Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
OnheapChunk(int size, int id, boolean fromPool) {
super(size, id, fromPool);
}
}
@Override
void allocateDataBuffer() {
if (data == null) {
data = ByteBuffer.allocate(this.size);
data.putLong(0, this.getId());
}
}
}

View File

@ -96,6 +96,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@ -2426,6 +2428,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
final Configuration conf, final HTableDescriptor htd, boolean initialize)
throws IOException {
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
WAL wal = createWal(conf, rootDir, info);
return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
}

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -49,8 +50,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@ -397,6 +400,7 @@ public class TestCoprocessorInterface {
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegionInfo info = new HRegionInfo(tableName, null, null, false);
Path path = new Path(DIR + callingMethod);
Region r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);

View File

@ -47,10 +47,12 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@ -152,6 +154,7 @@ public class TestRegionObserverScannerOpenHook {
for (byte[] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
Path path = new Path(DIR + callingMethod);
WAL wal = HBaseTestingUtility.createWal(conf, path, info);

View File

@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
@ -100,6 +102,7 @@ public class TestRegionObserverStacking extends TestCase {
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
Path path = new Path(DIR + callingMethod);
HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);

View File

@ -40,8 +40,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -342,6 +344,7 @@ public class TestScannerFromBucketCache {
private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
byte[]... families) throws IOException {
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log");
HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);

View File

@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -73,6 +75,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Triple;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -88,6 +91,10 @@ public class TestCatalogJanitor {
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setup() throws Exception {
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
}
/**
* Mock MasterServices for tests below.
*/

View File

@ -241,7 +241,7 @@ public class TestBulkLoad {
for (byte[] family : families) {
hTableDescriptor.addFamily(new HColumnDescriptor(family));
}
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
// TODO We need a way to do this without creating files
return HRegion.createHRegion(hRegionInfo,
new Path(testFolder.newFolder().toURI()),

View File

@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase {
descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true);
CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
MemStoreChunkPool.chunkPoolDisabled = false;
ChunkCreator.chunkPoolDisabled = false;
}
/* Create and test CellSet based on CellArrayMap */

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -50,7 +51,7 @@ import static org.junit.Assert.assertTrue;
public class TestCompactingMemStore extends TestDefaultMemStore {
private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class);
protected static MemStoreChunkPool chunkPool;
protected static ChunkCreator chunkCreator;
protected HRegion region;
protected RegionServicesForStores regionServicesForStores;
protected HStore store;
@ -65,7 +66,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@After
public void tearDown() throws Exception {
chunkPool.clearChunks();
chunkCreator.clearChunksInPool();
}
@Override
@ -84,15 +85,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
this.region = hbaseUtility.createTestRegion("foobar", hcd);
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar"));
htd.addFamily(hcd);
HRegionInfo info =
new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
//this.region = hbaseUtility.createTestRegion("foobar", hcd);
this.regionServicesForStores = region.getRegionServicesForStores();
this.store = new HStore(region, hcd, conf);
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f,
MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
assertTrue(chunkPool != null);
chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
assertTrue(chunkCreator != null);
}
/**
@ -390,7 +397,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}
@ -434,16 +441,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
assertTrue(chunkPool.getPoolSize() > 0);
assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
chunkPool.clearChunks();
chunkCreator.clearChunksInPool();
// Creating another snapshot
@ -464,7 +471,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
@ -516,16 +523,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
assertEquals(3, memstore.getActive().getCellsCount());
assertTrue(chunkPool.getPoolSize() == 0);
assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
assertTrue(chunkPool.getPoolSize() > 0);
assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
chunkPool.clearChunks();
chunkCreator.clearChunksInPool();
// Creating another snapshot
@ -553,7 +560,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
assertTrue(chunkCreator.getPoolSize() > 0);
}
//////////////////////////////////////////////////////////////////////////////

View File

@ -44,17 +44,13 @@ import java.util.List;
public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore {
private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class);
//private static MemStoreChunkPool chunkPool;
//private HRegion region;
//private RegionServicesForStores regionServicesForStores;
//private HStore store;
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
@Override public void tearDown() throws Exception {
chunkPool.clearChunks();
chunkCreator.clearChunksInPool();
}
@Override public void setUp() throws Exception {
@ -408,16 +404,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
assertTrue(chunkPool.getPoolSize() > 0);
assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
chunkPool.clearChunks();
chunkCreator.clearChunksInPool();
// Creating another snapshot
@ -438,7 +434,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
@ -472,7 +468,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}

View File

@ -164,6 +164,7 @@ public class TestCompactionArchiveConcurrentClose {
HRegionFileSystem fs = new WaitingHRegionFileSystem(conf, tableDir.getFileSystem(conf),
tableDir, info);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, tableDir);
final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName());

View File

@ -174,6 +174,7 @@ public class TestCompactionArchiveIOException {
private HRegion initHRegion(HTableDescriptor htd, HRegionInfo info)
throws IOException {
Configuration conf = testUtil.getConfiguration();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
Path regionDir = new Path(tableDir, info.getEncodedName());
Path storeDir = new Path(regionDir, htd.getColumnFamilies()[0].getNameAsString());

View File

@ -104,6 +104,7 @@ public class TestCompactionPolicy {
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
hlog = new FSHLog(fs, basedir, logName, conf);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
region = HRegion.createHRegion(info, basedir, conf, htd, hlog);
region.close();
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -84,6 +85,7 @@ public class TestDefaultMemStore {
protected static final byte[] FAMILY = Bytes.toBytes("column");
protected MultiVersionConcurrencyControl mvcc;
protected AtomicLong startSeqNum = new AtomicLong(0);
protected ChunkCreator chunkCreator;
private String getName() {
return this.name.getMethodName();
@ -92,9 +94,17 @@ public class TestDefaultMemStore {
@Before
public void setUp() throws Exception {
internalSetUp();
// no pool
this.chunkCreator =
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
this.memstore = new DefaultMemStore();
}
@AfterClass
public static void tearDownClass() throws Exception {
ChunkCreator.getInstance().clearChunkIds();
}
protected void internalSetUp() throws Exception {
this.mvcc = new MultiVersionConcurrencyControl();
}
@ -129,7 +139,9 @@ public class TestDefaultMemStore {
assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize());
// make sure chunk size increased even when writing the same cell, if using MSLAB
if (msLab instanceof MemStoreLABImpl) {
assertEquals(2 * Segment.getCellLength(kv),
// since we add the chunkID at the 0th offset of the chunk and the
// chunkid is a long we need to account for those 8 bytes
assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG,
((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset());
}
} else {

View File

@ -266,6 +266,7 @@ public class TestFailedAppendAndSync {
*/
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
wal, COLUMN_FAMILY_BYTES);
}

View File

@ -153,7 +153,7 @@ public class TestHMobStore {
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, basedir);
final WALFactory wals = new WALFactory(walConf, null, methodName);

View File

@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator;
@ -4931,6 +4932,7 @@ public class TestHRegion {
String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
throws IOException {
Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
return initHRegion(tableName, startKey, stopKey, isReadOnly,

View File

@ -153,7 +153,7 @@ public class TestHRegionReplayEvents {
}
time = System.currentTimeMillis();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
primaryHri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
false, time, 0);

View File

@ -48,30 +48,30 @@ import static org.junit.Assert.assertTrue;
@Category({RegionServerTests.class, SmallTests.class})
public class TestMemStoreChunkPool {
private final static Configuration conf = new Configuration();
private static MemStoreChunkPool chunkPool;
private static ChunkCreator chunkCreator;
private static boolean chunkPoolDisabledBeforeTest;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled;
MemStoreChunkPool.chunkPoolDisabled = false;
chunkPoolDisabledBeforeTest = ChunkCreator.chunkPoolDisabled;
ChunkCreator.chunkPoolDisabled = false;
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f,
MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
assertTrue(chunkPool != null);
chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
assertTrue(chunkCreator != null);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest;
ChunkCreator.chunkPoolDisabled = chunkPoolDisabledBeforeTest;
}
@Before
public void tearDown() throws Exception {
chunkPool.clearChunks();
chunkCreator.clearChunksInPool();
}
@Test
@ -90,7 +90,7 @@ public class TestMemStoreChunkPool {
int size = KeyValueUtil.length(kv);
ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
if (newKv.getBuffer() != lastBuffer) {
expectedOff = 0;
expectedOff = 8;
lastBuffer = newKv.getBuffer();
}
assertEquals(expectedOff, newKv.getOffset());
@ -100,14 +100,14 @@ public class TestMemStoreChunkPool {
}
// chunks will be put back to pool after close
mslab.close();
int chunkCount = chunkPool.getPoolSize();
int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
// reconstruct mslab
mslab = new MemStoreLABImpl(conf);
// chunk should be got from the pool, so we can reuse it.
KeyValue kv = new KeyValue(rk, cf, q, new byte[10]);
mslab.copyCellInto(kv);
assertEquals(chunkCount - 1, chunkPool.getPoolSize());
assertEquals(chunkCount - 1, chunkCreator.getPoolSize());
}
@Test
@ -143,7 +143,7 @@ public class TestMemStoreChunkPool {
}
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}
@ -189,16 +189,16 @@ public class TestMemStoreChunkPool {
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
assertTrue(chunkPool.getPoolSize() > 0);
assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
chunkPool.clearChunks();
chunkCreator.clearChunksInPool();
// Creating another snapshot
snapshot = memstore.snapshot();
@ -218,20 +218,20 @@ public class TestMemStoreChunkPool {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
public void testPutbackChunksMultiThreaded() throws Exception {
MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE;
final int maxCount = 10;
final int initialCount = 5;
final int chunkSize = 30;
final int chunkSize = 40;
final int valSize = 7;
MemStoreChunkPool pool = new MemStoreChunkPool(chunkSize, maxCount, initialCount, 1, false);
assertEquals(initialCount, pool.getPoolSize());
assertEquals(maxCount, pool.getMaxCount());
MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created.
ChunkCreator oldCreator = ChunkCreator.getInstance();
ChunkCreator newCreator = new ChunkCreator(chunkSize, false, 400, 1, 0.5f, null);
assertEquals(initialCount, newCreator.getPoolSize());
assertEquals(maxCount, newCreator.getMaxCount());
ChunkCreator.INSTANCE = newCreator;// Replace the global ref with the new one we created.
// Used it for the testing. Later in finally we put
// back the original
final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
@ -258,9 +258,9 @@ public class TestMemStoreChunkPool {
t1.join();
t2.join();
t3.join();
assertTrue(pool.getPoolSize() <= maxCount);
assertTrue(newCreator.getPoolSize() <= maxCount);
} finally {
MemStoreChunkPool.GLOBAL_INSTANCE = oldPool;
ChunkCreator.INSTANCE = oldCreator;
}
}
}

View File

@ -63,8 +63,8 @@ public class TestMemStoreLAB {
public static void setUpBeforeClass() throws Exception {
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT,
MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit,
0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
}
/**
@ -76,6 +76,7 @@ public class TestMemStoreLAB {
MemStoreLAB mslab = new MemStoreLABImpl();
int expectedOff = 0;
ByteBuffer lastBuffer = null;
long lastChunkId = -1;
// 100K iterations by 0-1K alloc -> 50MB expected
// should be reasonable for unit test and also cover wraparound
// behavior
@ -85,8 +86,13 @@ public class TestMemStoreLAB {
int size = KeyValueUtil.length(kv);
ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
if (newKv.getBuffer() != lastBuffer) {
expectedOff = 0;
// since we add the chunkID at the 0th offset of the chunk and the
// chunkid is a long we need to account for those 8 bytes
expectedOff = Bytes.SIZEOF_LONG;
lastBuffer = newKv.getBuffer();
long chunkId = newKv.getBuffer().getLong(0);
assertTrue("chunkid should be different", chunkId != lastChunkId);
lastChunkId = chunkId;
}
assertEquals(expectedOff, newKv.getOffset());
assertTrue("Allocation overruns buffer",
@ -136,23 +142,21 @@ public class TestMemStoreLAB {
};
ctx.addThread(t);
}
ctx.startThreads();
while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
// Partition the allocations by the actual byte[] they point into,
// make sure offsets are unique for each chunk
Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk =
Maps.newHashMap();
int sizeCounted = 0;
for (AllocRecord rec : Iterables.concat(allocations)) {
sizeCounted += rec.size;
if (rec.size == 0) continue;
Map<Integer, AllocRecord> mapForThisByteArray =
mapsByChunk.get(rec.alloc);
if (mapForThisByteArray == null) {
@ -167,7 +171,9 @@ public class TestMemStoreLAB {
// Now check each byte array to make sure allocations don't overlap
for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
int expectedOff = 0;
// since we add the chunkID at the 0th offset of the chunk and the
// chunkid is a long we need to account for those 8 bytes
int expectedOff = Bytes.SIZEOF_LONG;
for (AllocRecord alloc : allocsInChunk.values()) {
assertEquals(expectedOff, alloc.offset);
assertTrue("Allocation overruns buffer",
@ -175,7 +181,6 @@ public class TestMemStoreLAB {
expectedOff += alloc.size;
}
}
}
/**
@ -194,7 +199,7 @@ public class TestMemStoreLAB {
// set chunk size to default max alloc size, so we could easily trigger chunk retirement
conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT);
// reconstruct mslab
MemStoreChunkPool.clearDisableFlag();
ChunkCreator.clearDisableFlag();
mslab = new MemStoreLABImpl(conf);
// launch multiple threads to trigger frequent chunk retirement
List<Thread> threads = new ArrayList<>();
@ -223,6 +228,8 @@ public class TestMemStoreLAB {
}
// close the mslab
mslab.close();
// none of the chunkIds would have been returned back
assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0);
// make sure all chunks reclaimed or removed from chunk queue
int queueLength = mslab.getPooledChunks().size();
assertTrue("All chunks in chunk queue should be reclaimed or removed"

View File

@ -0,0 +1,168 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
public class TestMemstoreLABWithoutPool {
private final static Configuration conf = new Configuration();
private static final byte[] rk = Bytes.toBytes("r1");
private static final byte[] cf = Bytes.toBytes("f");
private static final byte[] q = Bytes.toBytes("q");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * 0.8);
// disable pool
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT + Bytes.SIZEOF_LONG, false, globalMemStoreLimit,
0.0f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
}
/**
* Test a bunch of random allocations
*/
@Test
public void testLABRandomAllocation() {
Random rand = new Random();
MemStoreLAB mslab = new MemStoreLABImpl();
int expectedOff = 0;
ByteBuffer lastBuffer = null;
long lastChunkId = -1;
// 100K iterations by 0-1K alloc -> 50MB expected
// should be reasonable for unit test and also cover wraparound
// behavior
for (int i = 0; i < 100000; i++) {
int valSize = rand.nextInt(1000);
KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
int size = KeyValueUtil.length(kv);
ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
if (newKv.getBuffer() != lastBuffer) {
// since we add the chunkID at the 0th offset of the chunk and the
// chunkid is a long we need to account for those 8 bytes
expectedOff = Bytes.SIZEOF_LONG;
lastBuffer = newKv.getBuffer();
long chunkId = newKv.getBuffer().getLong(0);
assertTrue("chunkid should be different", chunkId != lastChunkId);
lastChunkId = chunkId;
}
assertEquals(expectedOff, newKv.getOffset());
assertTrue("Allocation overruns buffer",
newKv.getOffset() + size <= newKv.getBuffer().capacity());
expectedOff += size;
}
}
/**
* Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
* there's no memory leak (HBASE-16195)
* @throws Exception if any error occurred
*/
@Test
public void testLABChunkQueueWithMultipleMSLABs() throws Exception {
Configuration conf = HBaseConfiguration.create();
MemStoreLABImpl[] mslab = new MemStoreLABImpl[10];
for (int i = 0; i < 10; i++) {
mslab[i] = new MemStoreLABImpl(conf);
}
// launch multiple threads to trigger frequent chunk retirement
List<Thread> threads = new ArrayList<>();
// create smaller sized kvs
final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
new byte[0]);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
threads.add(getChunkQueueTestThread(mslab[i], "testLABChunkQueue-" + j, kv));
}
}
for (Thread thread : threads) {
thread.start();
}
// let it run for some time
Thread.sleep(3000);
for (Thread thread : threads) {
thread.interrupt();
}
boolean threadsRunning = true;
boolean alive = false;
while (threadsRunning) {
alive = false;
for (Thread thread : threads) {
if (thread.isAlive()) {
alive = true;
break;
}
}
if (!alive) {
threadsRunning = false;
}
}
// close the mslab
for (int i = 0; i < 10; i++) {
mslab[i].close();
}
// all of the chunkIds would have been returned back
assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0);
}
private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
Cell cellToCopyInto) {
Thread thread = new Thread() {
boolean stopped = false;
@Override
public void run() {
while (!stopped) {
// keep triggering chunk retirement
mslab.copyCellInto(cellToCopyInto);
}
}
@Override
public void interrupt() {
this.stopped = true;
}
};
thread.setName(threadName);
thread.setDaemon(true);
return thread;
}
}

View File

@ -108,6 +108,7 @@ public class TestRecoveredEdits {
}
};
Path hbaseRootDir = TEST_UTIL.getDataTestDir();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
Path tableDir = FSUtils.getTableDir(hbaseRootDir, htd.getTableName());
HRegionFileSystem hrfs =

View File

@ -83,6 +83,7 @@ public class TestRegionIncrement {
private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
TEST_UTIL.getDataTestDir().toString(), conf);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf,
false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);

View File

@ -178,6 +178,7 @@ public class TestStore {
} else {
htd.addFamily(hcd);
}
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, basedir);

View File

@ -111,6 +111,7 @@ public class TestStoreFileRefresherChore {
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, tableDir);
final WALFactory wals = new WALFactory(walConf, null, "log_" + replicaId);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegion region =
new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()),
conf, htd, null);

View File

@ -588,6 +588,7 @@ public class TestWALLockup {
*/
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
wal, COLUMN_FAMILY_BYTES);
}

View File

@ -98,6 +98,7 @@ public class TestWALMonotonicallyIncreasingSeqId {
FSUtils.setRootDir(walConf, tableDir);
this.walConf = walConf;
wals = new WALFactory(walConf, null, "log_" + replicaId);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(),
info.getTable().getNamespace()), conf, htd, null);
region.initialize();

View File

@ -37,7 +37,9 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -288,6 +290,7 @@ public class TestDurability {
throw new IOException("Failed delete of " + path);
}
}
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return HRegion.createHRegion(info, path, CONF, htd, log);
}