diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index e1bc969fb1b..56de21bac0f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -3135,28 +3135,4 @@ public final class CellUtil { return Type.DeleteFamily.getCode(); } } - - /** - * Clone the passed cell by copying its data into the passed buf. - */ - public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) { - int tagsLen = cell.getTagsLength(); - if (cell instanceof ExtendedCell) { - ((ExtendedCell) cell).write(buf, offset); - } else { - // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the - // other case also. The data fragments within Cell is copied into buf as in KeyValue - // serialization format only. - KeyValueUtil.appendTo(cell, buf, offset, true); - } - if (tagsLen == 0) { - // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class - // which directly return tagsLen as 0. So we avoid parsing many length components in - // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell - // call getTagsLength(). - return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); - } else { - return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); - } - } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java index 517873f7c93..10f20ca49a3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.HeapSize; public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize, Cloneable { + public static int CELL_NOT_BASED_ON_CHUNK = -1; /** * Write this cell to an OutputStream in a {@link KeyValue} format. *
KeyValue format
@@ -73,4 +74,13 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam * @return The deep cloned cell */ Cell deepClone(); + + /** + * Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized + * chunks as in case of MemstoreLAB + * @return the chunk id if the cell is backed by fixed sized Chunks, else return -1 + */ + default int getChunkId() { + return CELL_NOT_BASED_ON_CHUNK; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bb9f282acb1..f9670e14d11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -748,6 +748,8 @@ public class HMaster extends HRegionServer implements MasterServices { this.masterActiveTime = System.currentTimeMillis(); // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. + // Initialize the chunkCreator + initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(this); this.walManager = new MasterWalManager(this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java new file mode 100644 index 00000000000..a8f10006cb2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.ByteBufferKeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +/** + * ByteBuffer based cell which has the chunkid at the 0th offset + * @see MemStoreLAB + */ +//TODO : When moving this cell to CellChunkMap we will have the following things +// to be serialized +// chunkId (Integer) + offset (Integer) + length (Integer) + seqId (Long) = 20 bytes +@InterfaceAudience.Private +public class ByteBufferChunkCell extends ByteBufferKeyValue { + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public int getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return ByteBufferUtils.toInt(buf, 0); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java index 2cbf0a3eb9e..fc4aa0be133 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java @@ -21,8 +21,10 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * A chunk of memory out of which allocations are sliced. @@ -46,13 +48,41 @@ public abstract class Chunk { /** Size of chunk in bytes */ protected final int size; + // The unique id associated with the chunk. + private final int id; + + // indicates if the chunk is formed by ChunkCreator#MemstorePool + private final boolean fromPool; + /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. - * + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. * @param size in bytes + * @param id the chunk id */ - Chunk(int size) { + public Chunk(int size, int id) { + this(size, id, false); + } + + /** + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. + * @param size in bytes + * @param id the chunk id + * @param fromPool if the chunk is formed by pool + */ + public Chunk(int size, int id, boolean fromPool) { this.size = size; + this.id = id; + this.fromPool = fromPool; + } + + int getId() { + return this.id; + } + + boolean isFromPool() { + return this.fromPool; } /** @@ -60,7 +90,24 @@ public abstract class Chunk { * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block * until the allocation is complete. */ - public abstract void init(); + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + allocateDataBuffer(); + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; + } + // Mark that it's ready for use + // Move 8 bytes since the first 8 bytes are having the chunkid in it + boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); + } + + abstract void allocateDataBuffer(); /** * Reset the offset to UNINITIALIZED before before reusing an old chunk @@ -74,7 +121,8 @@ public abstract class Chunk { /** * Try to allocate size bytes from the chunk. - * + * If a chunk is tried to get allocated before init() call, the thread doing the allocation + * will be in busy-wait state as it will keep looping till the nextFreeOffset is set. * @return the offset of the successful allocation, or -1 to indicate not-enough-space */ public int alloc(int size) { @@ -96,7 +144,7 @@ public abstract class Chunk { if (oldOffset + size > data.capacity()) { return -1; // alloc doesn't fit } - + // TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset // Try to atomically claim this chunk if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { // we got the alloc diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java new file mode 100644 index 00000000000..d5501489be6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -0,0 +1,404 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.lang.ref.SoftReference; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated + * with every chunk + */ +@InterfaceAudience.Private +public class ChunkCreator { + private static final Log LOG = LogFactory.getLog(ChunkCreator.class); + // monotonically increasing chunkid + private AtomicInteger chunkID = new AtomicInteger(1); + // maps the chunk against the monotonically increasing chunk id. We need to preserve the + // natural ordering of the key + // CellChunkMap creation should convert the soft ref to hard reference + private Map> chunkIdMap = + new ConcurrentHashMap>(); + private final int chunkSize; + private final boolean offheap; + @VisibleForTesting + static ChunkCreator INSTANCE; + @VisibleForTesting + static boolean chunkPoolDisabled = false; + private MemStoreChunkPool pool; + + @VisibleForTesting + ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage, + float initialCountPercentage, HeapMemoryManager heapMemoryManager) { + this.chunkSize = chunkSize; + this.offheap = offheap; + this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage); + if (heapMemoryManager != null && this.pool != null) { + // Register with Heap Memory manager + heapMemoryManager.registerTuneObserver(this.pool); + } + } + + /** + * Initializes the instance of MSLABChunkCreator + * @param chunkSize the chunkSize + * @param offheap indicates if the chunk is to be created offheap or not + * @param globalMemStoreSize the global memstore size + * @param poolSizePercentage pool size percentage + * @param initialCountPercentage the initial count of the chunk pool if any + * @param heapMemoryManager the heapmemory manager + * @return singleton MSLABChunkCreator + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", + justification = "Method is called by single thread at the starting of RS") + @VisibleForTesting + public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize, + float poolSizePercentage, float initialCountPercentage, HeapMemoryManager heapMemoryManager) { + if (INSTANCE != null) return INSTANCE; + INSTANCE = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, + initialCountPercentage, heapMemoryManager); + return INSTANCE; + } + + static ChunkCreator getInstance() { + return INSTANCE; + } + + /** + * Creates and inits a chunk. + * @return the chunk that was initialized + */ + Chunk getChunk() { + Chunk chunk = null; + if (pool != null) { + // the pool creates the chunk internally. The chunk#init() call happens here + chunk = this.pool.getChunk(); + // the pool has run out of maxCount + if (chunk == null) { + if (LOG.isTraceEnabled()) { + LOG.trace("The chunk pool is full. Reached maxCount= " + this.pool.getMaxCount() + + ". Creating chunk onheap."); + } + } + } + if (chunk == null) { + chunk = createChunk(); + } + // put this chunk into the chunkIdMap + this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk)); + // now we need to actually do the expensive memory allocation step in case of a new chunk, + // else only the offset is set to the beginning of the chunk to accept allocations + chunk.init(); + return chunk; + } + + private Chunk createChunk() { + return createChunk(false); + } + + /** + * Creates the chunk either onheap or offheap + * @param pool indicates if the chunks have to be created which will be used by the Pool + * @return the chunk + */ + private Chunk createChunk(boolean pool) { + int id = chunkID.getAndIncrement(); + assert id > 0; + // do not create offheap chunk on demand + if (pool && this.offheap) { + return new OffheapChunk(chunkSize, id, pool); + } else { + return new OnheapChunk(chunkSize, id, pool); + } + } + + @VisibleForTesting + // TODO : To be used by CellChunkMap + Chunk getChunk(int id) { + SoftReference ref = chunkIdMap.get(id); + if (ref != null) { + return ref.get(); + } + return null; + } + + int getChunkSize() { + return this.chunkSize; + } + + boolean isOffheap() { + return this.offheap; + } + + private void removeChunks(Set chunkIDs) { + this.chunkIdMap.keySet().removeAll(chunkIDs); + } + + Chunk removeChunk(int chunkId) { + SoftReference ref = this.chunkIdMap.remove(chunkId); + if (ref != null) { + return ref.get(); + } + return null; + } + + @VisibleForTesting + int size() { + return this.chunkIdMap.size(); + } + + @VisibleForTesting + void clearChunkIds() { + this.chunkIdMap.clear(); + } + + /** + * A pool of {@link Chunk} instances. + * + * MemStoreChunkPool caches a number of retired chunks for reusing, it could + * decrease allocating bytes when writing, thereby optimizing the garbage + * collection on JVM. + */ + private class MemStoreChunkPool implements HeapMemoryTuneObserver { + private int maxCount; + + // A queue of reclaimed chunks + private final BlockingQueue reclaimedChunks; + private final float poolSizePercentage; + + /** Statistics thread schedule pool */ + private final ScheduledExecutorService scheduleThreadPool; + /** Statistics thread */ + private static final int statThreadPeriod = 60 * 5; + private final AtomicLong chunkCount = new AtomicLong(); + private final AtomicLong reusedChunkCount = new AtomicLong(); + + MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { + this.maxCount = maxCount; + this.poolSizePercentage = poolSizePercentage; + this.reclaimedChunks = new LinkedBlockingQueue<>(); + for (int i = 0; i < initialCount; i++) { + Chunk chunk = createChunk(true); + chunk.init(); + reclaimedChunks.add(chunk); + } + chunkCount.set(initialCount); + final String n = Thread.currentThread().getName(); + scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, + statThreadPeriod, TimeUnit.SECONDS); + } + + /** + * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have + * not yet created max allowed chunks count. When we have already created max allowed chunks and + * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk + * then. + * Note: Chunks returned by this pool must be put back to the pool after its use. + * @return a chunk + * @see #putbackChunks(Set) + */ + Chunk getChunk() { + Chunk chunk = reclaimedChunks.poll(); + if (chunk != null) { + chunk.reset(); + reusedChunkCount.incrementAndGet(); + } else { + // Make a chunk iff we have not yet created the maxCount chunks + while (true) { + long created = this.chunkCount.get(); + if (created < this.maxCount) { + if (this.chunkCount.compareAndSet(created, created + 1)) { + chunk = createChunk(true); + break; + } + } else { + break; + } + } + } + return chunk; + } + + /** + * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining + * chunks + * @param chunks + */ + private void putbackChunks(Set chunks) { + int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); + Iterator iterator = chunks.iterator(); + while (iterator.hasNext()) { + Integer chunkId = iterator.next(); + // remove the chunks every time though they are from the pool or not + Chunk chunk = ChunkCreator.this.removeChunk(chunkId); + if (chunk != null) { + if (chunk.isFromPool() && toAdd > 0) { + reclaimedChunks.add(chunk); + } + toAdd--; + } + } + } + + private class StatisticsThread extends Thread { + StatisticsThread() { + super("MemStoreChunkPool.StatisticsThread"); + setDaemon(true); + } + + @Override + public void run() { + logStats(); + } + + private void logStats() { + if (!LOG.isDebugEnabled()) return; + long created = chunkCount.get(); + long reused = reusedChunkCount.get(); + long total = created + reused; + LOG.debug("Stats: current pool size=" + reclaimedChunks.size() + + ",created chunk count=" + created + + ",reused chunk count=" + reused + + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( + (float) reused / (float) total, 2))); + } + } + + private int getMaxCount() { + return this.maxCount; + } + + @Override + public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { + // don't do any tuning in case of offheap memstore + if (isOffheap()) { + LOG.warn("Not tuning the chunk pool as it is offheap"); + return; + } + int newMaxCount = + (int) (newMemstoreSize * poolSizePercentage / getChunkSize()); + if (newMaxCount != this.maxCount) { + // We need an adjustment in the chunks numbers + if (newMaxCount > this.maxCount) { + // Max chunks getting increased. Just change the variable. Later calls to getChunk() would + // create and add them to Q + LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + } else { + // Max chunks getting decreased. We may need to clear off some of the pooled chunks now + // itself. If the extra chunks are serving already, do not pool those when we get them back + LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + if (this.reclaimedChunks.size() > newMaxCount) { + synchronized (this) { + while (this.reclaimedChunks.size() > newMaxCount) { + this.reclaimedChunks.poll(); + } + } + } + } + } + } + } + + @VisibleForTesting + static void clearDisableFlag() { + chunkPoolDisabled = false; + } + + private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage, + float initialCountPercentage) { + if (poolSizePercentage <= 0) { + LOG.info("PoolSizePercentage is less than 0. So not using pool"); + return null; + } + if (chunkPoolDisabled) { + return null; + } + if (poolSizePercentage > 1.0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); + } + int maxCount = (int) (globalMemStoreSize * poolSizePercentage / getChunkSize()); + if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); + } + int initialCount = (int) (initialCountPercentage * maxCount); + LOG.info("Allocating MemStoreChunkPool with chunk size " + + StringUtils.byteDesc(getChunkSize()) + ", max count " + maxCount + + ", initial count " + initialCount); + return new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage); + } + + @VisibleForTesting + int getMaxCount() { + if (pool != null) { + return pool.getMaxCount(); + } + return 0; + } + + @VisibleForTesting + int getPoolSize() { + if (pool != null) { + return pool.reclaimedChunks.size(); + } + return 0; + } + + /* + * Only used in testing + */ + @VisibleForTesting + void clearChunksInPool() { + if (pool != null) { + pool.reclaimedChunks.clear(); + } + } + + synchronized void putbackChunks(Set chunks) { + if (pool != null) { + pool.putbackChunks(chunks); + } else { + this.removeChunks(chunks); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d14571b6e38..c1974187979 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1472,7 +1472,7 @@ public class HRegionServer extends HasThread implements startServiceThreads(); startHeapMemoryManager(); // Call it after starting HeapMemoryManager. - initializeMemStoreChunkPool(); + initializeMemStoreChunkCreator(); LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x" + @@ -1492,7 +1492,7 @@ public class HRegionServer extends HasThread implements } } - private void initializeMemStoreChunkPool() { + protected void initializeMemStoreChunkCreator() { if (MemStoreLAB.isEnabled(conf)) { // MSLAB is enabled. So initialize MemStoreChunkPool // By this time, the MemstoreFlusher is already initialized. We can get the global limits from @@ -1506,12 +1506,10 @@ public class HRegionServer extends HasThread implements float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); - MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage, - initialCountPercentage, chunkSize, offheap); - if (pool != null && this.hMemManager != null) { - // Register with Heap Memory manager - this.hMemManager.registerTuneObserver(pool); - } + // init the chunkCreator + ChunkCreator chunkCreator = + ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, + initialCountPercentage, this.hMemManager); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java deleted file mode 100644 index b7ac21259ef..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * A pool of {@link Chunk} instances. - * - * MemStoreChunkPool caches a number of retired chunks for reusing, it could - * decrease allocating bytes when writing, thereby optimizing the garbage - * collection on JVM. - * - * The pool instance is globally unique and could be obtained through - * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)} - * - * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating - * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called - * when MemStore clearing snapshot for flush - */ -@SuppressWarnings("javadoc") -@InterfaceAudience.Private -public class MemStoreChunkPool implements HeapMemoryTuneObserver { - private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); - - // Static reference to the MemStoreChunkPool - static MemStoreChunkPool GLOBAL_INSTANCE; - /** Boolean whether we have disabled the memstore chunk pool entirely. */ - static boolean chunkPoolDisabled = false; - - private int maxCount; - - // A queue of reclaimed chunks - private final BlockingQueue reclaimedChunks; - private final int chunkSize; - private final float poolSizePercentage; - - /** Statistics thread schedule pool */ - private final ScheduledExecutorService scheduleThreadPool; - /** Statistics thread */ - private static final int statThreadPeriod = 60 * 5; - private final AtomicLong chunkCount = new AtomicLong(); - private final AtomicLong reusedChunkCount = new AtomicLong(); - private final boolean offheap; - - MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage, - boolean offheap) { - this.maxCount = maxCount; - this.chunkSize = chunkSize; - this.poolSizePercentage = poolSizePercentage; - this.offheap = offheap; - this.reclaimedChunks = new LinkedBlockingQueue<>(); - for (int i = 0; i < initialCount; i++) { - Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize); - chunk.init(); - reclaimedChunks.add(chunk); - } - chunkCount.set(initialCount); - final String n = Thread.currentThread().getName(); - scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() - .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, - statThreadPeriod, TimeUnit.SECONDS); - } - - /** - * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have - * not yet created max allowed chunks count. When we have already created max allowed chunks and - * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk - * then. - * Note: Chunks returned by this pool must be put back to the pool after its use. - * @return a chunk - * @see #putbackChunk(Chunk) - * @see #putbackChunks(BlockingQueue) - */ - Chunk getChunk() { - Chunk chunk = reclaimedChunks.poll(); - if (chunk != null) { - chunk.reset(); - reusedChunkCount.incrementAndGet(); - } else { - // Make a chunk iff we have not yet created the maxCount chunks - while (true) { - long created = this.chunkCount.get(); - if (created < this.maxCount) { - chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize); - if (this.chunkCount.compareAndSet(created, created + 1)) { - break; - } - } else { - break; - } - } - } - return chunk; - } - - /** - * Add the chunks to the pool, when the pool achieves the max size, it will - * skip the remaining chunks - * @param chunks - */ - synchronized void putbackChunks(BlockingQueue chunks) { - int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); - Chunk chunk = null; - while ((chunk = chunks.poll()) != null && toAdd > 0) { - reclaimedChunks.add(chunk); - toAdd--; - } - } - - /** - * Add the chunk to the pool, if the pool has achieved the max size, it will - * skip it - * @param chunk - */ - synchronized void putbackChunk(Chunk chunk) { - if (reclaimedChunks.size() < this.maxCount) { - reclaimedChunks.add(chunk); - } - } - - int getPoolSize() { - return this.reclaimedChunks.size(); - } - - /* - * Only used in testing - */ - void clearChunks() { - this.reclaimedChunks.clear(); - } - - private class StatisticsThread extends Thread { - StatisticsThread() { - super("MemStoreChunkPool.StatisticsThread"); - setDaemon(true); - } - - @Override - public void run() { - logStats(); - } - - private void logStats() { - if (!LOG.isDebugEnabled()) return; - long created = chunkCount.get(); - long reused = reusedChunkCount.get(); - long total = created + reused; - LOG.debug("Stats: current pool size=" + reclaimedChunks.size() - + ",created chunk count=" + created - + ",reused chunk count=" + reused - + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( - (float) reused / (float) total, 2))); - } - } - - /** - * @return the global MemStoreChunkPool instance - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", - justification = "Method is called by single thread at the starting of RS") - static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage, int chunkSize, boolean offheap) { - if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; - if (chunkPoolDisabled) return null; - - if (poolSizePercentage <= 0) { - chunkPoolDisabled = true; - return null; - } - if (poolSizePercentage > 1.0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); - } - int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize); - if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); - } - int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) - + ", max count " + maxCount + ", initial count " + initialCount); - GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage, - offheap); - return GLOBAL_INSTANCE; - } - - /** - * @return The singleton instance of this pool. - */ - static MemStoreChunkPool getPool() { - return GLOBAL_INSTANCE; - } - - int getMaxCount() { - return this.maxCount; - } - - @VisibleForTesting - static void clearDisableFlag() { - chunkPoolDisabled = false; - } - - @Override - public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { - // don't do any tuning in case of offheap memstore - if (this.offheap) { - LOG.warn("Not tuning the chunk pool as it is offheap"); - return; - } - int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); - if (newMaxCount != this.maxCount) { - // We need an adjustment in the chunks numbers - if (newMaxCount > this.maxCount) { - // Max chunks getting increased. Just change the variable. Later calls to getChunk() would - // create and add them to Q - LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - } else { - // Max chunks getting decreased. We may need to clear off some of the pooled chunks now - // itself. If the extra chunks are serving already, do not pool those when we get them back - LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - if (this.reclaimedChunks.size() > newMaxCount) { - synchronized (this) { - while (this.reclaimedChunks.size() > newMaxCount) { - this.reclaimedChunks.poll(); - } - } - } - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index f6d1607f0f5..72e937ce0c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; *

* The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from * and then doles it out to threads that request slices into the array. These chunks can get pooled - * as well. See {@link MemStoreChunkPool}. + * as well. See {@link ChunkCreator}. *

* The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all * Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; * {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this * cell's data and copies into this area and then recreate a Cell over this copied data. *

- * @see MemStoreChunkPool + * @see ChunkCreator */ @InterfaceAudience.Private public interface MemStoreLAB { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 4e871354621..4fba82d24a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -18,23 +18,26 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.nio.ByteBuffer; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - /** * A memstore-local allocation buffer. *

@@ -55,8 +58,8 @@ import com.google.common.base.Preconditions; * would provide a performance improvement - probably would speed up the * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached * anyway. - * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}. - * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks, + * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}. + * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks, * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be * always on heap backed. */ @@ -66,14 +69,15 @@ public class MemStoreLABImpl implements MemStoreLAB { static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class); private AtomicReference curChunk = new AtomicReference<>(); - // A queue of chunks from pool contained by this memstore LAB - // TODO: in the future, it would be better to have List implementation instead of Queue, - // as FIFO order is not so important here + // Lock to manage multiple handlers requesting for a chunk + private ReentrantLock lock = new ReentrantLock(); + + // A set of chunks contained by this memstore LAB @VisibleForTesting - BlockingQueue pooledChunkQueue = null; + Set chunks = new ConcurrentSkipListSet(); private final int chunkSize; private final int maxAlloc; - private final MemStoreChunkPool chunkPool; + private final ChunkCreator chunkCreator; // This flag is for closing this instance, its set when clearing snapshot of // memstore @@ -92,20 +96,12 @@ public class MemStoreLABImpl implements MemStoreLAB { public MemStoreLABImpl(Configuration conf) { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); - this.chunkPool = MemStoreChunkPool.getPool(); - // currently chunkQueue is only used for chunkPool - if (this.chunkPool != null) { - // set queue length to chunk pool max count to avoid keeping reference of - // too many non-reclaimable chunks - pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount()); - } - + this.chunkCreator = ChunkCreator.getInstance(); // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument(maxAlloc <= chunkSize, MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } - @Override public Cell copyCellInto(Cell cell) { int size = KeyValueUtil.length(cell); @@ -118,19 +114,52 @@ public class MemStoreLABImpl implements MemStoreLAB { Chunk c = null; int allocOffset = 0; while (true) { + // Try to get the chunk c = getOrMakeChunk(); + // we may get null because the some other thread succeeded in getting the lock + // and so the current thread has to try again to make its chunk or grab the chunk + // that the other thread created // Try to allocate from this chunk - allocOffset = c.alloc(size); - if (allocOffset != -1) { - // We succeeded - this is the common case - small alloc - // from a big buffer - break; + if (c != null) { + allocOffset = c.alloc(size); + if (allocOffset != -1) { + // We succeeded - this is the common case - small alloc + // from a big buffer + break; + } + // not enough space! + // try to retire this chunk + tryRetireChunk(c); } - // not enough space! - // try to retire this chunk - tryRetireChunk(c); } - return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size); + return copyToChunkCell(cell, c.getData(), allocOffset, size); + } + + /** + * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid + * out of it + */ + private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { + int tagsLen = cell.getTagsLength(); + if (cell instanceof ExtendedCell) { + ((ExtendedCell) cell).write(buf, offset); + } else { + // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the + // other case also. The data fragments within Cell is copied into buf as in KeyValue + // serialization format only. + KeyValueUtil.appendTo(cell, buf, offset, true); + } + // TODO : write the seqid here. For writing seqId we should create a new cell type so + // that seqId is not used as the state + if (tagsLen == 0) { + // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class + // which directly return tagsLen as 0. So we avoid parsing many length components in + // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell + // call getTagsLength(). + return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); + } else { + return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); + } } /** @@ -142,9 +171,9 @@ public class MemStoreLABImpl implements MemStoreLAB { this.closed = true; // We could put back the chunks to pool for reusing only when there is no // opening scanner which will read their data - if (chunkPool != null && openScannerCount.get() == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); + int count = openScannerCount.get(); + if(count == 0) { + recycleChunks(); } } @@ -162,9 +191,14 @@ public class MemStoreLABImpl implements MemStoreLAB { @Override public void decScannerCount() { int count = this.openScannerCount.decrementAndGet(); - if (this.closed && chunkPool != null && count == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); + if (this.closed && count == 0) { + recycleChunks(); + } + } + + private void recycleChunks() { + if (reclaimed.compareAndSet(false, true)) { + chunkCreator.putbackChunks(chunks); } } @@ -190,45 +224,33 @@ public class MemStoreLABImpl implements MemStoreLAB { * allocate a new one from the JVM. */ private Chunk getOrMakeChunk() { - while (true) { - // Try to get the chunk - Chunk c = curChunk.get(); - if (c != null) { - return c; - } - - // No current chunk, so we want to allocate one. We race - // against other allocators to CAS in an uninitialized chunk - // (which is cheap to allocate) - if (chunkPool != null) { - c = chunkPool.getChunk(); - } - boolean pooledChunk = false; - if (c != null) { - // This is chunk from pool - pooledChunk = true; - } else { - c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap. - } - if (curChunk.compareAndSet(null, c)) { - // we won race - now we need to actually do the expensive - // allocation step - c.init(); - if (pooledChunk) { - if (!this.closed && !this.pooledChunkQueue.offer(c)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " - + pooledChunkQueue.size()); - } - } - } - return c; - } else if (pooledChunk) { - chunkPool.putbackChunk(c); - } - // someone else won race - that's fine, we'll try to grab theirs - // in the next iteration of the loop. + // Try to get the chunk + Chunk c = curChunk.get(); + if (c != null) { + return c; } + // No current chunk, so we want to allocate one. We race + // against other allocators to CAS in an uninitialized chunk + // (which is cheap to allocate) + if (lock.tryLock()) { + try { + // once again check inside the lock + c = curChunk.get(); + if (c != null) { + return c; + } + c = this.chunkCreator.getChunk(); + if (c != null) { + // set the curChunk. No need of CAS as only one thread will be here + curChunk.set(c); + chunks.add(c.getId()); + return c; + } + } finally { + lock.unlock(); + } + } + return null; } @VisibleForTesting @@ -236,8 +258,15 @@ public class MemStoreLABImpl implements MemStoreLAB { return this.curChunk.get(); } - + @VisibleForTesting BlockingQueue getPooledChunks() { - return this.pooledChunkQueue; + BlockingQueue pooledChunks = new LinkedBlockingQueue<>(); + for (Integer id : this.chunks) { + Chunk chunk = chunkCreator.getChunk(id); + if (chunk != null && chunk.isFromPool()) { + pooledChunks.add(chunk); + } + } + return pooledChunks; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java new file mode 100644 index 00000000000..a8ba50c8558 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + + +/** + * ByteBuffer based cell which has the chunkid at the 0th offset and with no tags + * @see MemStoreLAB + */ +@InterfaceAudience.Private +public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue { + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public int getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return ByteBufferUtils.toInt(buf, 0); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java index ed98cfaae09..e244a33d802 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java @@ -21,34 +21,27 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.base.Preconditions; - /** * An off heap chunk implementation. */ @InterfaceAudience.Private public class OffheapChunk extends Chunk { - OffheapChunk(int size) { - super(size); + OffheapChunk(int size, int id) { + // better if this is always created fromPool. This should not be called + super(size, id); + } + + OffheapChunk(int size, int id, boolean fromPool) { + super(size, id, fromPool); + assert fromPool == true; } @Override - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocateDirect(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocateDirect(this.size); + data.putLong(0, this.getId()); } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java index bd33cb5d196..da34e24bf7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java @@ -21,33 +21,25 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.base.Preconditions; - /** * An on heap chunk implementation. */ @InterfaceAudience.Private public class OnheapChunk extends Chunk { - OnheapChunk(int size) { - super(size); + OnheapChunk(int size, int id) { + super(size, id); } - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocate(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; - } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); + OnheapChunk(int size, int id, boolean fromPool) { + super(size, id, fromPool); } -} + + @Override + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocate(this.size); + data.putLong(0, this.getId()); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index acf2af06e46..c0ddbfc357c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -95,6 +95,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -2424,6 +2426,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir, final Configuration conf, final HTableDescriptor htd, boolean initialize) throws IOException { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); WAL wal = createWal(conf, rootDir, info); return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 422c54bf8a2..8d8b6df5fe2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -49,8 +50,10 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -397,6 +400,7 @@ public class TestCoprocessorInterface { for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); } + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(tableName, null, null, false); Path path = new Path(DIR + callingMethod); Region r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 80d0e3ac139..b99087d54ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -47,10 +47,12 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -152,6 +154,7 @@ public class TestRegionObserverScannerOpenHook { for (byte[] family : families) { htd.addFamily(new HColumnDescriptor(family)); } + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); Path path = new Path(DIR + callingMethod); WAL wal = HBaseTestingUtility.createWal(conf, path, info); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java index 2e44dee10cb..15d449db297 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; @@ -100,6 +102,7 @@ public class TestRegionObserverStacking extends TestCase { for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); } + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); Path path = new Path(DIR + callingMethod); HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java index f1775d0e470..fae724702db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java @@ -40,8 +40,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -342,6 +344,7 @@ public class TestScannerFromBucketCache { private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly, byte[]... families) throws IOException { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log"); HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey); final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index cc73d9df0c6..32bce263b80 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -73,6 +75,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Triple; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -88,6 +91,10 @@ public class TestCatalogJanitor { @Rule public TestName name = new TestName(); + @BeforeClass + public static void setup() throws Exception { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + } /** * Mock MasterServices for tests below. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 418aadf562e..096c5ef59ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -241,7 +241,7 @@ public class TestBulkLoad { for (byte[] family : families) { hTableDescriptor.addFamily(new HColumnDescriptor(family)); } - + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); // TODO We need a way to do this without creating files return HRegion.createHRegion(hRegionInfo, new Path(testFolder.newFolder().toURI()), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java index 3b4d0685b51..09877b01b46 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase { descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true); CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - MemStoreChunkPool.chunkPoolDisabled = false; + ChunkCreator.chunkPoolDisabled = false; } /* Create and test CellSet based on CellArrayMap */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index a888c4527a8..9e90f3e39f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -50,7 +51,7 @@ import static org.junit.Assert.assertTrue; public class TestCompactingMemStore extends TestDefaultMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class); - protected static MemStoreChunkPool chunkPool; + protected static ChunkCreator chunkCreator; protected HRegion region; protected RegionServicesForStores regionServicesForStores; protected HStore store; @@ -65,7 +66,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @After public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); } @Override @@ -84,15 +85,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore { conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf); HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); - this.region = hbaseUtility.createTestRegion("foobar", hcd); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar")); + htd.addFamily(hcd); + HRegionInfo info = + new HRegionInfo(TableName.valueOf("foobar"), null, null, false); + WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); + this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); + //this.region = hbaseUtility.createTestRegion("foobar", hcd); this.regionServicesForStores = region.getRegionServicesForStores(); this.store = new HStore(region, hcd, conf); long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, - MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); - assertTrue(chunkPool != null); + chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + assertTrue(chunkCreator != null); } /** @@ -390,7 +397,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } @@ -434,16 +441,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot @@ -464,7 +471,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test @@ -516,16 +523,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf1, 3, val), null); assertEquals(3, memstore.getActive().getCellsCount()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot @@ -553,7 +560,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } ////////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 5a48455eeb2..66e107ae656 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -44,17 +44,13 @@ import java.util.List; public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class); - //private static MemStoreChunkPool chunkPool; - //private HRegion region; - //private RegionServicesForStores regionServicesForStores; - //private HStore store; ////////////////////////////////////////////////////////////////////////////// // Helpers ////////////////////////////////////////////////////////////////////////////// @Override public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); } @Override public void setUp() throws Exception { @@ -408,16 +404,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot @@ -438,7 +434,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test @@ -472,7 +468,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java index 8e85730a524..e32036877e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java @@ -164,6 +164,7 @@ public class TestCompactionArchiveConcurrentClose { HRegionFileSystem fs = new WaitingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java index 89b23689793..e7fcf18a107 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java @@ -174,6 +174,7 @@ public class TestCompactionArchiveIOException { private HRegion initHRegion(HTableDescriptor htd, HRegionInfo info) throws IOException { Configuration conf = testUtil.getConfiguration(); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName()); Path regionDir = new Path(tableDir, info.getEncodedName()); Path storeDir = new Path(regionDir, htd.getColumnFamilies()[0].getNameAsString()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java index 58dbe8deb34..543ca6ccb44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java @@ -101,6 +101,7 @@ public class TestCompactionPolicy { HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); hlog = new FSHLog(fs, basedir, logName, conf); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); region = HRegion.createHRegion(info, basedir, conf, htd, hlog); region.close(); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 7434eb1d7b0..41b304b3309 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -84,6 +85,7 @@ public class TestDefaultMemStore { protected static final byte[] FAMILY = Bytes.toBytes("column"); protected MultiVersionConcurrencyControl mvcc; protected AtomicLong startSeqNum = new AtomicLong(0); + protected ChunkCreator chunkCreator; private String getName() { return this.name.getMethodName(); @@ -92,9 +94,17 @@ public class TestDefaultMemStore { @Before public void setUp() throws Exception { internalSetUp(); + // no pool + this.chunkCreator = + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); this.memstore = new DefaultMemStore(); } + @AfterClass + public static void tearDownClass() throws Exception { + ChunkCreator.getInstance().clearChunkIds(); + } + protected void internalSetUp() throws Exception { this.mvcc = new MultiVersionConcurrencyControl(); } @@ -129,7 +139,9 @@ public class TestDefaultMemStore { assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize()); // make sure chunk size increased even when writing the same cell, if using MSLAB if (msLab instanceof MemStoreLABImpl) { - assertEquals(2 * Segment.getCellLength(kv), + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG, ((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset()); } } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 73fb9cf4c0e..24e850daff5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -266,6 +266,7 @@ public class TestFailedAppendAndSync { */ public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) throws IOException { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index b416c7d9e53..0f24a24f0c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -153,7 +153,7 @@ public class TestHMobStore { htd.addFamily(hcd); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); - + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); final WALFactory wals = new WALFactory(walConf, null, methodName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index d56d6ecf2ac..095f4bd27dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterAllFilter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.NullComparator; @@ -4931,6 +4932,7 @@ public class TestHRegion { String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) throws IOException { Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey); final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri); return initHRegion(tableName, startKey, stopKey, isReadOnly, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 005464262bb..6eed7df9faa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -153,7 +153,7 @@ public class TestHRegionReplayEvents { } time = System.currentTimeMillis(); - + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); primaryHri = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, time, 0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 37a7664b77b..1768801a647 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -48,30 +48,30 @@ import static org.junit.Assert.assertTrue; @Category({RegionServerTests.class, SmallTests.class}) public class TestMemStoreChunkPool { private final static Configuration conf = new Configuration(); - private static MemStoreChunkPool chunkPool; + private static ChunkCreator chunkCreator; private static boolean chunkPoolDisabledBeforeTest; @BeforeClass public static void setUpBeforeClass() throws Exception { conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; - MemStoreChunkPool.chunkPoolDisabled = false; + chunkPoolDisabledBeforeTest = ChunkCreator.chunkPoolDisabled; + ChunkCreator.chunkPoolDisabled = false; long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, - MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); - assertTrue(chunkPool != null); + chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + assertTrue(chunkCreator != null); } @AfterClass public static void tearDownAfterClass() throws Exception { - MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest; + ChunkCreator.chunkPoolDisabled = chunkPoolDisabledBeforeTest; } @Before public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); } @Test @@ -90,7 +90,7 @@ public class TestMemStoreChunkPool { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - expectedOff = 0; + expectedOff = 8; lastBuffer = newKv.getBuffer(); } assertEquals(expectedOff, newKv.getOffset()); @@ -100,14 +100,14 @@ public class TestMemStoreChunkPool { } // chunks will be put back to pool after close mslab.close(); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); // reconstruct mslab mslab = new MemStoreLABImpl(conf); // chunk should be got from the pool, so we can reuse it. KeyValue kv = new KeyValue(rk, cf, q, new byte[10]); mslab.copyCellInto(kv); - assertEquals(chunkCount - 1, chunkPool.getPoolSize()); + assertEquals(chunkCount - 1, chunkCreator.getPoolSize()); } @Test @@ -143,7 +143,7 @@ public class TestMemStoreChunkPool { } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } @@ -189,16 +189,16 @@ public class TestMemStoreChunkPool { } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot snapshot = memstore.snapshot(); @@ -218,20 +218,20 @@ public class TestMemStoreChunkPool { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test public void testPutbackChunksMultiThreaded() throws Exception { - MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE; final int maxCount = 10; final int initialCount = 5; - final int chunkSize = 30; + final int chunkSize = 40; final int valSize = 7; - MemStoreChunkPool pool = new MemStoreChunkPool(chunkSize, maxCount, initialCount, 1, false); - assertEquals(initialCount, pool.getPoolSize()); - assertEquals(maxCount, pool.getMaxCount()); - MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created. + ChunkCreator oldCreator = ChunkCreator.getInstance(); + ChunkCreator newCreator = new ChunkCreator(chunkSize, false, 400, 1, 0.5f, null); + assertEquals(initialCount, newCreator.getPoolSize()); + assertEquals(maxCount, newCreator.getMaxCount()); + ChunkCreator.INSTANCE = newCreator;// Replace the global ref with the new one we created. // Used it for the testing. Later in finally we put // back the original final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), @@ -258,9 +258,9 @@ public class TestMemStoreChunkPool { t1.join(); t2.join(); t3.join(); - assertTrue(pool.getPoolSize() <= maxCount); + assertTrue(newCreator.getPoolSize() <= maxCount); } finally { - MemStoreChunkPool.GLOBAL_INSTANCE = oldPool; + ChunkCreator.INSTANCE = oldCreator; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 141b8023e2f..63e63ead1cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -63,8 +63,8 @@ public class TestMemStoreLAB { public static void setUpBeforeClass() throws Exception { long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, - MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, + 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); } /** @@ -76,6 +76,7 @@ public class TestMemStoreLAB { MemStoreLAB mslab = new MemStoreLABImpl(); int expectedOff = 0; ByteBuffer lastBuffer = null; + long lastChunkId = -1; // 100K iterations by 0-1K alloc -> 50MB expected // should be reasonable for unit test and also cover wraparound // behavior @@ -85,8 +86,13 @@ public class TestMemStoreLAB { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - expectedOff = 0; + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + expectedOff = Bytes.SIZEOF_LONG; lastBuffer = newKv.getBuffer(); + long chunkId = newKv.getBuffer().getLong(0); + assertTrue("chunkid should be different", chunkId != lastChunkId); + lastChunkId = chunkId; } assertEquals(expectedOff, newKv.getOffset()); assertTrue("Allocation overruns buffer", @@ -136,23 +142,21 @@ public class TestMemStoreLAB { }; ctx.addThread(t); } - + ctx.startThreads(); while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); - // Partition the allocations by the actual byte[] they point into, // make sure offsets are unique for each chunk Map> mapsByChunk = Maps.newHashMap(); - + int sizeCounted = 0; for (AllocRecord rec : Iterables.concat(allocations)) { sizeCounted += rec.size; if (rec.size == 0) continue; - Map mapForThisByteArray = mapsByChunk.get(rec.alloc); if (mapForThisByteArray == null) { @@ -167,7 +171,9 @@ public class TestMemStoreLAB { // Now check each byte array to make sure allocations don't overlap for (Map allocsInChunk : mapsByChunk.values()) { - int expectedOff = 0; + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + int expectedOff = Bytes.SIZEOF_LONG; for (AllocRecord alloc : allocsInChunk.values()) { assertEquals(expectedOff, alloc.offset); assertTrue("Allocation overruns buffer", @@ -175,7 +181,6 @@ public class TestMemStoreLAB { expectedOff += alloc.size; } } - } /** @@ -185,54 +190,72 @@ public class TestMemStoreLAB { */ @Test public void testLABChunkQueue() throws Exception { - MemStoreLABImpl mslab = new MemStoreLABImpl(); - // by default setting, there should be no chunks initialized in the pool - assertTrue(mslab.getPooledChunks().isEmpty()); - // reset mslab with chunk pool - Configuration conf = HBaseConfiguration.create(); - conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1); - // set chunk size to default max alloc size, so we could easily trigger chunk retirement - conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); - // reconstruct mslab - MemStoreChunkPool.clearDisableFlag(); - mslab = new MemStoreLABImpl(conf); - // launch multiple threads to trigger frequent chunk retirement - List threads = new ArrayList<>(); - final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), - new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 24]); - for (int i = 0; i < 10; i++) { - threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv)); - } - for (Thread thread : threads) { - thread.start(); - } - // let it run for some time - Thread.sleep(1000); - for (Thread thread : threads) { - thread.interrupt(); - } - boolean threadsRunning = true; - while (threadsRunning) { + ChunkCreator oldInstance = null; + try { + MemStoreLABImpl mslab = new MemStoreLABImpl(); + // by default setting, there should be no chunks initialized in the pool + assertTrue(mslab.getPooledChunks().isEmpty()); + oldInstance = ChunkCreator.INSTANCE; + ChunkCreator.INSTANCE = null; + // reset mslab with chunk pool + Configuration conf = HBaseConfiguration.create(); + conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1); + // set chunk size to default max alloc size, so we could easily trigger chunk retirement + conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); + // reconstruct mslab + long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() + .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); + ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, + globalMemStoreLimit, 0.1f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + ChunkCreator.clearDisableFlag(); + mslab = new MemStoreLABImpl(conf); + // launch multiple threads to trigger frequent chunk retirement + List threads = new ArrayList<>(); + final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), + new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]); + for (int i = 0; i < 10; i++) { + threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv)); + } for (Thread thread : threads) { - if (thread.isAlive()) { - threadsRunning = true; - break; + thread.start(); + } + // let it run for some time + Thread.sleep(1000); + for (Thread thread : threads) { + thread.interrupt(); + } + boolean threadsRunning = true; + boolean alive = false; + while (threadsRunning) { + alive = false; + for (Thread thread : threads) { + if (thread.isAlive()) { + alive = true; + break; + } + } + if (!alive) { + threadsRunning = false; } } - threadsRunning = false; + // none of the chunkIds would have been returned back + assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0); + // close the mslab + mslab.close(); + // make sure all chunks reclaimed or removed from chunk queue + int queueLength = mslab.getPooledChunks().size(); + assertTrue("All chunks in chunk queue should be reclaimed or removed" + + " after mslab closed but actually: " + queueLength, + queueLength == 0); + } finally { + ChunkCreator.INSTANCE = oldInstance; } - // close the mslab - mslab.close(); - // make sure all chunks reclaimed or removed from chunk queue - int queueLength = mslab.getPooledChunks().size(); - assertTrue("All chunks in chunk queue should be reclaimed or removed" - + " after mslab closed but actually: " + queueLength, queueLength == 0); } private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, Cell cellToCopyInto) { Thread thread = new Thread() { - boolean stopped = false; + volatile boolean stopped = false; @Override public void run() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java new file mode 100644 index 00000000000..1af98e99c8d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java @@ -0,0 +1,168 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ByteBufferKeyValue; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestMemstoreLABWithoutPool { + private final static Configuration conf = new Configuration(); + + private static final byte[] rk = Bytes.toBytes("r1"); + private static final byte[] cf = Bytes.toBytes("f"); + private static final byte[] q = Bytes.toBytes("q"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() + .getMax() * 0.8); + // disable pool + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT + Bytes.SIZEOF_LONG, false, globalMemStoreLimit, + 0.0f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + } + + /** + * Test a bunch of random allocations + */ + @Test + public void testLABRandomAllocation() { + Random rand = new Random(); + MemStoreLAB mslab = new MemStoreLABImpl(); + int expectedOff = 0; + ByteBuffer lastBuffer = null; + long lastChunkId = -1; + // 100K iterations by 0-1K alloc -> 50MB expected + // should be reasonable for unit test and also cover wraparound + // behavior + for (int i = 0; i < 100000; i++) { + int valSize = rand.nextInt(1000); + KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); + int size = KeyValueUtil.length(kv); + ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); + if (newKv.getBuffer() != lastBuffer) { + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + expectedOff = Bytes.SIZEOF_LONG; + lastBuffer = newKv.getBuffer(); + long chunkId = newKv.getBuffer().getLong(0); + assertTrue("chunkid should be different", chunkId != lastChunkId); + lastChunkId = chunkId; + } + assertEquals(expectedOff, newKv.getOffset()); + assertTrue("Allocation overruns buffer", + newKv.getOffset() + size <= newKv.getBuffer().capacity()); + expectedOff += size; + } + } + + /** + * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure + * there's no memory leak (HBASE-16195) + * @throws Exception if any error occurred + */ + @Test + public void testLABChunkQueueWithMultipleMSLABs() throws Exception { + Configuration conf = HBaseConfiguration.create(); + MemStoreLABImpl[] mslab = new MemStoreLABImpl[10]; + for (int i = 0; i < 10; i++) { + mslab[i] = new MemStoreLABImpl(conf); + } + // launch multiple threads to trigger frequent chunk retirement + List threads = new ArrayList<>(); + // create smaller sized kvs + final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), + new byte[0]); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + threads.add(getChunkQueueTestThread(mslab[i], "testLABChunkQueue-" + j, kv)); + } + } + for (Thread thread : threads) { + thread.start(); + } + // let it run for some time + Thread.sleep(3000); + for (Thread thread : threads) { + thread.interrupt(); + } + boolean threadsRunning = true; + boolean alive = false; + while (threadsRunning) { + alive = false; + for (Thread thread : threads) { + if (thread.isAlive()) { + alive = true; + break; + } + } + if (!alive) { + threadsRunning = false; + } + } + // close the mslab + for (int i = 0; i < 10; i++) { + mslab[i].close(); + } + // all of the chunkIds would have been returned back + assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0); + } + + private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, + Cell cellToCopyInto) { + Thread thread = new Thread() { + volatile boolean stopped = false; + + @Override + public void run() { + while (!stopped) { + // keep triggering chunk retirement + mslab.copyCellInto(cellToCopyInto); + } + } + + @Override + public void interrupt() { + this.stopped = true; + } + }; + thread.setName(threadName); + thread.setDaemon(true); + return thread; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java index 4315bd42f42..7160e5e9cba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java @@ -108,6 +108,7 @@ public class TestRecoveredEdits { } }; Path hbaseRootDir = TEST_UTIL.getDataTestDir(); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); Path tableDir = FSUtils.getTableDir(hbaseRootDir, htd.getTableName()); HRegionFileSystem hrfs = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java index 5d11c0ece47..ad56081a6c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java @@ -83,6 +83,7 @@ public class TestRegionIncrement { private HRegion getRegion(final Configuration conf, final String tableName) throws IOException { WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), TEST_UTIL.getDataTestDir().toString(), conf); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index bf0fb051eeb..c1fd6a3376f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -60,12 +60,13 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import com.google.common.collect.Lists; -import org.junit.rules.TestName; /** * Test cases against ReversibleKeyValueScanner @@ -91,6 +92,10 @@ public class TestReversibleScanners { @Rule public TestName name = new TestName(); + @BeforeClass + public static void setUp() { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + } @Test public void testReversibleStoreFileScanner() throws IOException { FileSystem fs = TEST_UTIL.getTestFileSystem(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 76bf1ccb068..64e8397c565 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -178,6 +178,7 @@ public class TestStore { } else { htd.addFamily(hcd); } + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java index 3cdb227dbcb..99dd00db995 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java @@ -111,6 +111,7 @@ public class TestStoreFileRefresherChore { final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); final WALFactory wals = new WALFactory(walConf, null, "log_" + replicaId); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), conf, htd, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 4f247b0cb37..51260a6f71b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -588,6 +588,7 @@ public class TestWALLockup { */ public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) throws IOException { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java index 994779f6639..e63bad91240 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -98,6 +98,7 @@ public class TestWALMonotonicallyIncreasingSeqId { FSUtils.setRootDir(walConf, tableDir); this.walConf = walConf; wals = new WALFactory(walConf, null, "log_" + replicaId); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), conf, htd, null); region.initialize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 0be7b3157c0..09d27af8462 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -54,7 +54,9 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java index f976b492711..057b9bfd348 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java @@ -37,7 +37,9 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -288,6 +290,7 @@ public class TestDurability { throw new IOException("Failed delete of " + path); } } + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return HRegion.createHRegion(info, path, CONF, htd, log); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 8847c4c0d85..2cf576c42a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -37,7 +37,9 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -153,7 +155,7 @@ public class TestFSHLog extends AbstractTestFSWAL { new HTableDescriptor(TableName.valueOf(this.name.getMethodName())).addFamily(new HColumnDescriptor(b)); HRegionInfo hri = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log); ExecutorService exec = Executors.newFixedThreadPool(2);