diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 56de21bac0f..e1bc969fb1b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -3135,4 +3135,28 @@ public final class CellUtil { return Type.DeleteFamily.getCode(); } } + + /** + * Clone the passed cell by copying its data into the passed buf. + */ + public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) { + int tagsLen = cell.getTagsLength(); + if (cell instanceof ExtendedCell) { + ((ExtendedCell) cell).write(buf, offset); + } else { + // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the + // other case also. The data fragments within Cell is copied into buf as in KeyValue + // serialization format only. + KeyValueUtil.appendTo(cell, buf, offset, true); + } + if (tagsLen == 0) { + // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class + // which directly return tagsLen as 0. So we avoid parsing many length components in + // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell + // call getTagsLength(). + return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); + } else { + return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); + } + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java index 10f20ca49a3..517873f7c93 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.io.HeapSize; public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize, Cloneable { - public static int CELL_NOT_BASED_ON_CHUNK = -1; /** * Write this cell to an OutputStream in a {@link KeyValue} format. *
KeyValue format
@@ -74,13 +73,4 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam * @return The deep cloned cell */ Cell deepClone(); - - /** - * Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized - * chunks as in case of MemstoreLAB - * @return the chunk id if the cell is backed by fixed sized Chunks, else return -1 - */ - default int getChunkId() { - return CELL_NOT_BASED_ON_CHUNK; - } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f9670e14d11..bb9f282acb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -748,8 +748,6 @@ public class HMaster extends HRegionServer implements MasterServices { this.masterActiveTime = System.currentTimeMillis(); // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. - // Initialize the chunkCreator - initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(this); this.walManager = new MasterWalManager(this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java deleted file mode 100644 index a8f10006cb2..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.nio.ByteBuffer; - -import org.apache.hadoop.hbase.ByteBufferKeyValue; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ByteBufferUtils; - -/** - * ByteBuffer based cell which has the chunkid at the 0th offset - * @see MemStoreLAB - */ -//TODO : When moving this cell to CellChunkMap we will have the following things -// to be serialized -// chunkId (Integer) + offset (Integer) + length (Integer) + seqId (Long) = 20 bytes -@InterfaceAudience.Private -public class ByteBufferChunkCell extends ByteBufferKeyValue { - public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) { - super(buf, offset, length); - } - - public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { - super(buf, offset, length, seqId); - } - - @Override - public int getChunkId() { - // The chunkId is embedded at the 0th offset of the bytebuffer - return ByteBufferUtils.toInt(buf, 0); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java index fc4aa0be133..2cbf0a3eb9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java @@ -21,10 +21,8 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; /** * A chunk of memory out of which allocations are sliced. @@ -48,41 +46,13 @@ public abstract class Chunk { /** Size of chunk in bytes */ protected final int size; - // The unique id associated with the chunk. - private final int id; - - // indicates if the chunk is formed by ChunkCreator#MemstorePool - private final boolean fromPool; - /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so - * this is cheap. + * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. + * * @param size in bytes - * @param id the chunk id */ - public Chunk(int size, int id) { - this(size, id, false); - } - - /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so - * this is cheap. - * @param size in bytes - * @param id the chunk id - * @param fromPool if the chunk is formed by pool - */ - public Chunk(int size, int id, boolean fromPool) { + Chunk(int size) { this.size = size; - this.id = id; - this.fromPool = fromPool; - } - - int getId() { - return this.id; - } - - boolean isFromPool() { - return this.fromPool; } /** @@ -90,24 +60,7 @@ public abstract class Chunk { * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block * until the allocation is complete. */ - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - allocateDataBuffer(); - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; - } - // Mark that it's ready for use - // Move 8 bytes since the first 8 bytes are having the chunkid in it - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); - } - - abstract void allocateDataBuffer(); + public abstract void init(); /** * Reset the offset to UNINITIALIZED before before reusing an old chunk @@ -121,8 +74,7 @@ public abstract class Chunk { /** * Try to allocate size bytes from the chunk. - * If a chunk is tried to get allocated before init() call, the thread doing the allocation - * will be in busy-wait state as it will keep looping till the nextFreeOffset is set. + * * @return the offset of the successful allocation, or -1 to indicate not-enough-space */ public int alloc(int size) { @@ -144,7 +96,7 @@ public abstract class Chunk { if (oldOffset + size > data.capacity()) { return -1; // alloc doesn't fit } - // TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset + // Try to atomically claim this chunk if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { // we got the alloc diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java deleted file mode 100644 index 073fb259756..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ /dev/null @@ -1,404 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.lang.ref.SoftReference; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated - * with every chunk - */ -@InterfaceAudience.Private -public class ChunkCreator { - private final Log LOG = LogFactory.getLog(ChunkCreator.class); - // monotonically increasing chunkid - private AtomicInteger chunkID = new AtomicInteger(1); - // maps the chunk against the monotonically increasing chunk id. We need to preserve the - // natural ordering of the key - // CellChunkMap creation should convert the soft ref to hard reference - private Map> chunkIdMap = - new ConcurrentHashMap>(); - private final int chunkSize; - private final boolean offheap; - @VisibleForTesting - static ChunkCreator INSTANCE; - @VisibleForTesting - static boolean chunkPoolDisabled = false; - private MemStoreChunkPool pool; - - @VisibleForTesting - ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage, HeapMemoryManager heapMemoryManager) { - this.chunkSize = chunkSize; - this.offheap = offheap; - this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage); - if (heapMemoryManager != null && this.pool != null) { - // Register with Heap Memory manager - heapMemoryManager.registerTuneObserver(this.pool); - } - } - - /** - * Initializes the instance of MSLABChunkCreator - * @param chunkSize the chunkSize - * @param offheap indicates if the chunk is to be created offheap or not - * @param globalMemStoreSize the global memstore size - * @param poolSizePercentage pool size percentage - * @param initialCountPercentage the initial count of the chunk pool if any - * @param heapMemoryManager the heapmemory manager - * @return singleton MSLABChunkCreator - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", - justification = "Method is called by single thread at the starting of RS") - @VisibleForTesting - public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize, - float poolSizePercentage, float initialCountPercentage, HeapMemoryManager heapMemoryManager) { - if (INSTANCE != null) return INSTANCE; - INSTANCE = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, - initialCountPercentage, heapMemoryManager); - return INSTANCE; - } - - static ChunkCreator getInstance() { - return INSTANCE; - } - - /** - * Creates and inits a chunk. - * @return the chunk that was initialized - */ - Chunk getChunk() { - Chunk chunk = null; - if (pool != null) { - // the pool creates the chunk internally. The chunk#init() call happens here - chunk = this.pool.getChunk(); - // the pool has run out of maxCount - if (chunk == null) { - if (LOG.isTraceEnabled()) { - LOG.trace("The chunk pool is full. Reached maxCount= " + this.pool.getMaxCount() - + ". Creating chunk onheap."); - } - } - } - if (chunk == null) { - chunk = createChunk(); - } - // put this chunk into the chunkIdMap - this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk)); - // now we need to actually do the expensive memory allocation step in case of a new chunk, - // else only the offset is set to the beginning of the chunk to accept allocations - chunk.init(); - return chunk; - } - - private Chunk createChunk() { - return createChunk(false); - } - - /** - * Creates the chunk either onheap or offheap - * @param pool indicates if the chunks have to be created which will be used by the Pool - * @return the chunk - */ - private Chunk createChunk(boolean pool) { - int id = chunkID.getAndIncrement(); - assert id > 0; - // do not create offheap chunk on demand - if (pool && this.offheap) { - return new OffheapChunk(chunkSize, id, pool); - } else { - return new OnheapChunk(chunkSize, id, pool); - } - } - - @VisibleForTesting - // TODO : To be used by CellChunkMap - Chunk getChunk(int id) { - SoftReference ref = chunkIdMap.get(id); - if (ref != null) { - return ref.get(); - } - return null; - } - - int getChunkSize() { - return this.chunkSize; - } - - boolean isOffheap() { - return this.offheap; - } - - private void removeChunks(Set chunkIDs) { - this.chunkIdMap.keySet().removeAll(chunkIDs); - } - - Chunk removeChunk(int chunkId) { - SoftReference ref = this.chunkIdMap.remove(chunkId); - if (ref != null) { - return ref.get(); - } - return null; - } - - @VisibleForTesting - int size() { - return this.chunkIdMap.size(); - } - - @VisibleForTesting - void clearChunkIds() { - this.chunkIdMap.clear(); - } - - /** - * A pool of {@link Chunk} instances. - * - * MemStoreChunkPool caches a number of retired chunks for reusing, it could - * decrease allocating bytes when writing, thereby optimizing the garbage - * collection on JVM. - */ - private class MemStoreChunkPool implements HeapMemoryTuneObserver { - private int maxCount; - - // A queue of reclaimed chunks - private final BlockingQueue reclaimedChunks; - private final float poolSizePercentage; - - /** Statistics thread schedule pool */ - private final ScheduledExecutorService scheduleThreadPool; - /** Statistics thread */ - private static final int statThreadPeriod = 60 * 5; - private final AtomicLong chunkCount = new AtomicLong(); - private final AtomicLong reusedChunkCount = new AtomicLong(); - - MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { - this.maxCount = maxCount; - this.poolSizePercentage = poolSizePercentage; - this.reclaimedChunks = new LinkedBlockingQueue<>(); - for (int i = 0; i < initialCount; i++) { - Chunk chunk = createChunk(true); - chunk.init(); - reclaimedChunks.add(chunk); - } - chunkCount.set(initialCount); - final String n = Thread.currentThread().getName(); - scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() - .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, - statThreadPeriod, TimeUnit.SECONDS); - } - - /** - * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have - * not yet created max allowed chunks count. When we have already created max allowed chunks and - * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk - * then. - * Note: Chunks returned by this pool must be put back to the pool after its use. - * @return a chunk - * @see #putbackChunks(Set) - */ - Chunk getChunk() { - Chunk chunk = reclaimedChunks.poll(); - if (chunk != null) { - chunk.reset(); - reusedChunkCount.incrementAndGet(); - } else { - // Make a chunk iff we have not yet created the maxCount chunks - while (true) { - long created = this.chunkCount.get(); - if (created < this.maxCount) { - if (this.chunkCount.compareAndSet(created, created + 1)) { - chunk = createChunk(true); - break; - } - } else { - break; - } - } - } - return chunk; - } - - /** - * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining - * chunks - * @param chunks - */ - private void putbackChunks(Set chunks) { - int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); - Iterator iterator = chunks.iterator(); - while (iterator.hasNext()) { - Integer chunkId = iterator.next(); - // remove the chunks every time though they are from the pool or not - Chunk chunk = ChunkCreator.this.removeChunk(chunkId); - if (chunk != null) { - if (chunk.isFromPool() && toAdd > 0) { - reclaimedChunks.add(chunk); - } - toAdd--; - } - } - } - - private class StatisticsThread extends Thread { - StatisticsThread() { - super("MemStoreChunkPool.StatisticsThread"); - setDaemon(true); - } - - @Override - public void run() { - logStats(); - } - - private void logStats() { - if (!LOG.isDebugEnabled()) return; - long created = chunkCount.get(); - long reused = reusedChunkCount.get(); - long total = created + reused; - LOG.debug("Stats: current pool size=" + reclaimedChunks.size() - + ",created chunk count=" + created - + ",reused chunk count=" + reused - + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( - (float) reused / (float) total, 2))); - } - } - - private int getMaxCount() { - return this.maxCount; - } - - @Override - public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { - // don't do any tuning in case of offheap memstore - if (isOffheap()) { - LOG.warn("Not tuning the chunk pool as it is offheap"); - return; - } - int newMaxCount = - (int) (newMemstoreSize * poolSizePercentage / getChunkSize()); - if (newMaxCount != this.maxCount) { - // We need an adjustment in the chunks numbers - if (newMaxCount > this.maxCount) { - // Max chunks getting increased. Just change the variable. Later calls to getChunk() would - // create and add them to Q - LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - } else { - // Max chunks getting decreased. We may need to clear off some of the pooled chunks now - // itself. If the extra chunks are serving already, do not pool those when we get them back - LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - if (this.reclaimedChunks.size() > newMaxCount) { - synchronized (this) { - while (this.reclaimedChunks.size() > newMaxCount) { - this.reclaimedChunks.poll(); - } - } - } - } - } - } - } - - @VisibleForTesting - static void clearDisableFlag() { - chunkPoolDisabled = false; - } - - private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage) { - if (poolSizePercentage <= 0) { - LOG.info("PoolSizePercentage is less than 0. So not using pool"); - return null; - } - if (chunkPoolDisabled) { - return null; - } - if (poolSizePercentage > 1.0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); - } - int maxCount = (int) (globalMemStoreSize * poolSizePercentage / getChunkSize()); - if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); - } - int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " - + StringUtils.byteDesc(getChunkSize()) + ", max count " + maxCount - + ", initial count " + initialCount); - return new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage); - } - - @VisibleForTesting - int getMaxCount() { - if (pool != null) { - return pool.getMaxCount(); - } - return 0; - } - - @VisibleForTesting - int getPoolSize() { - if (pool != null) { - return pool.reclaimedChunks.size(); - } - return 0; - } - - /* - * Only used in testing - */ - @VisibleForTesting - void clearChunksInPool() { - if (pool != null) { - pool.reclaimedChunks.clear(); - } - } - - synchronized void putbackChunks(Set chunks) { - if (pool != null) { - pool.putbackChunks(chunks); - } else { - this.removeChunks(chunks); - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 41eb0a356be..b3b5113e350 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1472,7 +1472,7 @@ public class HRegionServer extends HasThread implements startServiceThreads(); startHeapMemoryManager(); // Call it after starting HeapMemoryManager. - initializeMemStoreChunkCreator(); + initializeMemStoreChunkPool(); LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x" + @@ -1492,7 +1492,7 @@ public class HRegionServer extends HasThread implements } } - protected void initializeMemStoreChunkCreator() { + private void initializeMemStoreChunkPool() { if (MemStoreLAB.isEnabled(conf)) { // MSLAB is enabled. So initialize MemStoreChunkPool // By this time, the MemstoreFlusher is already initialized. We can get the global limits from @@ -1506,10 +1506,12 @@ public class HRegionServer extends HasThread implements float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); - // init the chunkCreator - ChunkCreator chunkCreator = - ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, - initialCountPercentage, this.hMemManager); + MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage, + initialCountPercentage, chunkSize, offheap); + if (pool != null && this.hMemManager != null) { + // Register with Heap Memory manager + this.hMemManager.registerTuneObserver(pool); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java new file mode 100644 index 00000000000..b7ac21259ef --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -0,0 +1,265 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A pool of {@link Chunk} instances. + * + * MemStoreChunkPool caches a number of retired chunks for reusing, it could + * decrease allocating bytes when writing, thereby optimizing the garbage + * collection on JVM. + * + * The pool instance is globally unique and could be obtained through + * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)} + * + * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating + * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called + * when MemStore clearing snapshot for flush + */ +@SuppressWarnings("javadoc") +@InterfaceAudience.Private +public class MemStoreChunkPool implements HeapMemoryTuneObserver { + private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); + + // Static reference to the MemStoreChunkPool + static MemStoreChunkPool GLOBAL_INSTANCE; + /** Boolean whether we have disabled the memstore chunk pool entirely. */ + static boolean chunkPoolDisabled = false; + + private int maxCount; + + // A queue of reclaimed chunks + private final BlockingQueue reclaimedChunks; + private final int chunkSize; + private final float poolSizePercentage; + + /** Statistics thread schedule pool */ + private final ScheduledExecutorService scheduleThreadPool; + /** Statistics thread */ + private static final int statThreadPeriod = 60 * 5; + private final AtomicLong chunkCount = new AtomicLong(); + private final AtomicLong reusedChunkCount = new AtomicLong(); + private final boolean offheap; + + MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage, + boolean offheap) { + this.maxCount = maxCount; + this.chunkSize = chunkSize; + this.poolSizePercentage = poolSizePercentage; + this.offheap = offheap; + this.reclaimedChunks = new LinkedBlockingQueue<>(); + for (int i = 0; i < initialCount; i++) { + Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize); + chunk.init(); + reclaimedChunks.add(chunk); + } + chunkCount.set(initialCount); + final String n = Thread.currentThread().getName(); + scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, + statThreadPeriod, TimeUnit.SECONDS); + } + + /** + * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have + * not yet created max allowed chunks count. When we have already created max allowed chunks and + * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk + * then. + * Note: Chunks returned by this pool must be put back to the pool after its use. + * @return a chunk + * @see #putbackChunk(Chunk) + * @see #putbackChunks(BlockingQueue) + */ + Chunk getChunk() { + Chunk chunk = reclaimedChunks.poll(); + if (chunk != null) { + chunk.reset(); + reusedChunkCount.incrementAndGet(); + } else { + // Make a chunk iff we have not yet created the maxCount chunks + while (true) { + long created = this.chunkCount.get(); + if (created < this.maxCount) { + chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize); + if (this.chunkCount.compareAndSet(created, created + 1)) { + break; + } + } else { + break; + } + } + } + return chunk; + } + + /** + * Add the chunks to the pool, when the pool achieves the max size, it will + * skip the remaining chunks + * @param chunks + */ + synchronized void putbackChunks(BlockingQueue chunks) { + int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); + Chunk chunk = null; + while ((chunk = chunks.poll()) != null && toAdd > 0) { + reclaimedChunks.add(chunk); + toAdd--; + } + } + + /** + * Add the chunk to the pool, if the pool has achieved the max size, it will + * skip it + * @param chunk + */ + synchronized void putbackChunk(Chunk chunk) { + if (reclaimedChunks.size() < this.maxCount) { + reclaimedChunks.add(chunk); + } + } + + int getPoolSize() { + return this.reclaimedChunks.size(); + } + + /* + * Only used in testing + */ + void clearChunks() { + this.reclaimedChunks.clear(); + } + + private class StatisticsThread extends Thread { + StatisticsThread() { + super("MemStoreChunkPool.StatisticsThread"); + setDaemon(true); + } + + @Override + public void run() { + logStats(); + } + + private void logStats() { + if (!LOG.isDebugEnabled()) return; + long created = chunkCount.get(); + long reused = reusedChunkCount.get(); + long total = created + reused; + LOG.debug("Stats: current pool size=" + reclaimedChunks.size() + + ",created chunk count=" + created + + ",reused chunk count=" + reused + + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( + (float) reused / (float) total, 2))); + } + } + + /** + * @return the global MemStoreChunkPool instance + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", + justification = "Method is called by single thread at the starting of RS") + static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage, + float initialCountPercentage, int chunkSize, boolean offheap) { + if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; + if (chunkPoolDisabled) return null; + + if (poolSizePercentage <= 0) { + chunkPoolDisabled = true; + return null; + } + if (poolSizePercentage > 1.0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); + } + int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize); + if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); + } + int initialCount = (int) (initialCountPercentage * maxCount); + LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) + + ", max count " + maxCount + ", initial count " + initialCount); + GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage, + offheap); + return GLOBAL_INSTANCE; + } + + /** + * @return The singleton instance of this pool. + */ + static MemStoreChunkPool getPool() { + return GLOBAL_INSTANCE; + } + + int getMaxCount() { + return this.maxCount; + } + + @VisibleForTesting + static void clearDisableFlag() { + chunkPoolDisabled = false; + } + + @Override + public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { + // don't do any tuning in case of offheap memstore + if (this.offheap) { + LOG.warn("Not tuning the chunk pool as it is offheap"); + return; + } + int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); + if (newMaxCount != this.maxCount) { + // We need an adjustment in the chunks numbers + if (newMaxCount > this.maxCount) { + // Max chunks getting increased. Just change the variable. Later calls to getChunk() would + // create and add them to Q + LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + } else { + // Max chunks getting decreased. We may need to clear off some of the pooled chunks now + // itself. If the extra chunks are serving already, do not pool those when we get them back + LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + if (this.reclaimedChunks.size() > newMaxCount) { + synchronized (this) { + while (this.reclaimedChunks.size() > newMaxCount) { + this.reclaimedChunks.poll(); + } + } + } + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index 72e937ce0c3..f6d1607f0f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; *

* The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from * and then doles it out to threads that request slices into the array. These chunks can get pooled - * as well. See {@link ChunkCreator}. + * as well. See {@link MemStoreChunkPool}. *

* The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all * Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; * {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this * cell's data and copies into this area and then recreate a Cell over this copied data. *

- * @see ChunkCreator + * @see MemStoreChunkPool */ @InterfaceAudience.Private public interface MemStoreLAB { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 4fba82d24a8..4e871354621 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -18,26 +18,23 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.nio.ByteBuffer; -import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + /** * A memstore-local allocation buffer. *

@@ -58,8 +55,8 @@ import com.google.common.base.Preconditions; * would provide a performance improvement - probably would speed up the * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached * anyway. - * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}. - * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks, + * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}. + * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks, * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be * always on heap backed. */ @@ -69,15 +66,14 @@ public class MemStoreLABImpl implements MemStoreLAB { static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class); private AtomicReference curChunk = new AtomicReference<>(); - // Lock to manage multiple handlers requesting for a chunk - private ReentrantLock lock = new ReentrantLock(); - - // A set of chunks contained by this memstore LAB + // A queue of chunks from pool contained by this memstore LAB + // TODO: in the future, it would be better to have List implementation instead of Queue, + // as FIFO order is not so important here @VisibleForTesting - Set chunks = new ConcurrentSkipListSet(); + BlockingQueue pooledChunkQueue = null; private final int chunkSize; private final int maxAlloc; - private final ChunkCreator chunkCreator; + private final MemStoreChunkPool chunkPool; // This flag is for closing this instance, its set when clearing snapshot of // memstore @@ -96,12 +92,20 @@ public class MemStoreLABImpl implements MemStoreLAB { public MemStoreLABImpl(Configuration conf) { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); - this.chunkCreator = ChunkCreator.getInstance(); + this.chunkPool = MemStoreChunkPool.getPool(); + // currently chunkQueue is only used for chunkPool + if (this.chunkPool != null) { + // set queue length to chunk pool max count to avoid keeping reference of + // too many non-reclaimable chunks + pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount()); + } + // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument(maxAlloc <= chunkSize, MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } + @Override public Cell copyCellInto(Cell cell) { int size = KeyValueUtil.length(cell); @@ -114,52 +118,19 @@ public class MemStoreLABImpl implements MemStoreLAB { Chunk c = null; int allocOffset = 0; while (true) { - // Try to get the chunk c = getOrMakeChunk(); - // we may get null because the some other thread succeeded in getting the lock - // and so the current thread has to try again to make its chunk or grab the chunk - // that the other thread created // Try to allocate from this chunk - if (c != null) { - allocOffset = c.alloc(size); - if (allocOffset != -1) { - // We succeeded - this is the common case - small alloc - // from a big buffer - break; - } - // not enough space! - // try to retire this chunk - tryRetireChunk(c); + allocOffset = c.alloc(size); + if (allocOffset != -1) { + // We succeeded - this is the common case - small alloc + // from a big buffer + break; } + // not enough space! + // try to retire this chunk + tryRetireChunk(c); } - return copyToChunkCell(cell, c.getData(), allocOffset, size); - } - - /** - * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid - * out of it - */ - private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { - int tagsLen = cell.getTagsLength(); - if (cell instanceof ExtendedCell) { - ((ExtendedCell) cell).write(buf, offset); - } else { - // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the - // other case also. The data fragments within Cell is copied into buf as in KeyValue - // serialization format only. - KeyValueUtil.appendTo(cell, buf, offset, true); - } - // TODO : write the seqid here. For writing seqId we should create a new cell type so - // that seqId is not used as the state - if (tagsLen == 0) { - // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class - // which directly return tagsLen as 0. So we avoid parsing many length components in - // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell - // call getTagsLength(). - return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); - } else { - return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); - } + return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size); } /** @@ -171,9 +142,9 @@ public class MemStoreLABImpl implements MemStoreLAB { this.closed = true; // We could put back the chunks to pool for reusing only when there is no // opening scanner which will read their data - int count = openScannerCount.get(); - if(count == 0) { - recycleChunks(); + if (chunkPool != null && openScannerCount.get() == 0 + && reclaimed.compareAndSet(false, true)) { + chunkPool.putbackChunks(this.pooledChunkQueue); } } @@ -191,14 +162,9 @@ public class MemStoreLABImpl implements MemStoreLAB { @Override public void decScannerCount() { int count = this.openScannerCount.decrementAndGet(); - if (this.closed && count == 0) { - recycleChunks(); - } - } - - private void recycleChunks() { - if (reclaimed.compareAndSet(false, true)) { - chunkCreator.putbackChunks(chunks); + if (this.closed && chunkPool != null && count == 0 + && reclaimed.compareAndSet(false, true)) { + chunkPool.putbackChunks(this.pooledChunkQueue); } } @@ -224,33 +190,45 @@ public class MemStoreLABImpl implements MemStoreLAB { * allocate a new one from the JVM. */ private Chunk getOrMakeChunk() { - // Try to get the chunk - Chunk c = curChunk.get(); - if (c != null) { - return c; - } - // No current chunk, so we want to allocate one. We race - // against other allocators to CAS in an uninitialized chunk - // (which is cheap to allocate) - if (lock.tryLock()) { - try { - // once again check inside the lock - c = curChunk.get(); - if (c != null) { - return c; - } - c = this.chunkCreator.getChunk(); - if (c != null) { - // set the curChunk. No need of CAS as only one thread will be here - curChunk.set(c); - chunks.add(c.getId()); - return c; - } - } finally { - lock.unlock(); + while (true) { + // Try to get the chunk + Chunk c = curChunk.get(); + if (c != null) { + return c; } + + // No current chunk, so we want to allocate one. We race + // against other allocators to CAS in an uninitialized chunk + // (which is cheap to allocate) + if (chunkPool != null) { + c = chunkPool.getChunk(); + } + boolean pooledChunk = false; + if (c != null) { + // This is chunk from pool + pooledChunk = true; + } else { + c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap. + } + if (curChunk.compareAndSet(null, c)) { + // we won race - now we need to actually do the expensive + // allocation step + c.init(); + if (pooledChunk) { + if (!this.closed && !this.pooledChunkQueue.offer(c)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " + + pooledChunkQueue.size()); + } + } + } + return c; + } else if (pooledChunk) { + chunkPool.putbackChunk(c); + } + // someone else won race - that's fine, we'll try to grab theirs + // in the next iteration of the loop. } - return null; } @VisibleForTesting @@ -258,15 +236,8 @@ public class MemStoreLABImpl implements MemStoreLAB { return this.curChunk.get(); } - @VisibleForTesting + BlockingQueue getPooledChunks() { - BlockingQueue pooledChunks = new LinkedBlockingQueue<>(); - for (Integer id : this.chunks) { - Chunk chunk = chunkCreator.getChunk(id); - if (chunk != null && chunk.isFromPool()) { - pooledChunks.add(chunk); - } - } - return pooledChunks; + return this.pooledChunkQueue; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java deleted file mode 100644 index a8ba50c8558..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.nio.ByteBuffer; - -import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ByteBufferUtils; - - -/** - * ByteBuffer based cell which has the chunkid at the 0th offset and with no tags - * @see MemStoreLAB - */ -@InterfaceAudience.Private -public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue { - - public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) { - super(buf, offset, length); - } - - public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { - super(buf, offset, length, seqId); - } - - @Override - public int getChunkId() { - // The chunkId is embedded at the 0th offset of the bytebuffer - return ByteBufferUtils.toInt(buf, 0); - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java index e244a33d802..ed98cfaae09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java @@ -21,27 +21,34 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import com.google.common.base.Preconditions; + /** * An off heap chunk implementation. */ @InterfaceAudience.Private public class OffheapChunk extends Chunk { - OffheapChunk(int size, int id) { - // better if this is always created fromPool. This should not be called - super(size, id); - } - - OffheapChunk(int size, int id, boolean fromPool) { - super(size, id, fromPool); - assert fromPool == true; + OffheapChunk(int size) { + super(size); } @Override - void allocateDataBuffer() { - if (data == null) { - data = ByteBuffer.allocateDirect(this.size); - data.putLong(0, this.getId()); + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + if (data == null) { + data = ByteBuffer.allocateDirect(this.size); + } + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; } + // Mark that it's ready for use + boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java index da34e24bf7d..bd33cb5d196 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java @@ -21,25 +21,33 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import com.google.common.base.Preconditions; + /** * An on heap chunk implementation. */ @InterfaceAudience.Private public class OnheapChunk extends Chunk { - OnheapChunk(int size, int id) { - super(size, id); + OnheapChunk(int size) { + super(size); } - OnheapChunk(int size, int id, boolean fromPool) { - super(size, id, fromPool); - } - - @Override - void allocateDataBuffer() { - if (data == null) { - data = ByteBuffer.allocate(this.size); - data.putLong(0, this.getId()); + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + if (data == null) { + data = ByteBuffer.allocate(this.size); + } + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; } + // Mark that it's ready for use + boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 65631220c3e..82c2eabf6db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -96,8 +96,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.ChunkCreator; -import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -2428,7 +2426,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir, final Configuration conf, final HTableDescriptor htd, boolean initialize) throws IOException { - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); WAL wal = createWal(conf, rootDir, info); return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 8d8b6df5fe2..422c54bf8a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -50,10 +49,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -400,7 +397,6 @@ public class TestCoprocessorInterface { for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(tableName, null, null, false); Path path = new Path(DIR + callingMethod); Region r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index b99087d54ac..80d0e3ac139 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -47,12 +47,10 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -154,7 +152,6 @@ public class TestRegionObserverScannerOpenHook { for (byte[] family : families) { htd.addFamily(new HColumnDescriptor(family)); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); Path path = new Path(DIR + callingMethod); WAL wal = HBaseTestingUtility.createWal(conf, path, info); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java index 15d449db297..2e44dee10cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java @@ -34,9 +34,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; @@ -102,7 +100,6 @@ public class TestRegionObserverStacking extends TestCase { for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); Path path = new Path(DIR + callingMethod); HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java index fae724702db..f1775d0e470 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java @@ -40,10 +40,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -344,7 +342,6 @@ public class TestScannerFromBucketCache { private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly, byte[]... families) throws IOException { - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log"); HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey); final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 32bce263b80..cc73d9df0c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -65,8 +65,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.ChunkCreator; -import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -75,7 +73,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Triple; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -91,10 +88,6 @@ public class TestCatalogJanitor { @Rule public TestName name = new TestName(); - @BeforeClass - public static void setup() throws Exception { - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); - } /** * Mock MasterServices for tests below. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 096c5ef59ae..418aadf562e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -241,7 +241,7 @@ public class TestBulkLoad { for (byte[] family : families) { hTableDescriptor.addFamily(new HColumnDescriptor(family)); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + // TODO We need a way to do this without creating files return HRegion.createHRegion(hRegionInfo, new Path(testFolder.newFolder().toURI()), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java index 09877b01b46..3b4d0685b51 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase { descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true); CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - ChunkCreator.chunkPoolDisabled = false; + MemStoreChunkPool.chunkPoolDisabled = false; } /* Create and test CellSet based on CellArrayMap */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 9e90f3e39f9..a888c4527a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -51,7 +50,7 @@ import static org.junit.Assert.assertTrue; public class TestCompactingMemStore extends TestDefaultMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class); - protected static ChunkCreator chunkCreator; + protected static MemStoreChunkPool chunkPool; protected HRegion region; protected RegionServicesForStores regionServicesForStores; protected HStore store; @@ -66,7 +65,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @After public void tearDown() throws Exception { - chunkCreator.clearChunksInPool(); + chunkPool.clearChunks(); } @Override @@ -85,21 +84,15 @@ public class TestCompactingMemStore extends TestDefaultMemStore { conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf); HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); - HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar")); - htd.addFamily(hcd); - HRegionInfo info = - new HRegionInfo(TableName.valueOf("foobar"), null, null, false); - WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); - this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); - //this.region = hbaseUtility.createTestRegion("foobar", hcd); + this.region = hbaseUtility.createTestRegion("foobar", hcd); this.regionServicesForStores = region.getRegionServicesForStores(); this.store = new HStore(region, hcd, conf); long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, - globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); - assertTrue(chunkCreator != null); + chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, + MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); + assertTrue(chunkPool != null); } /** @@ -397,7 +390,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkCreator.getPoolSize(); + int chunkCount = chunkPool.getPoolSize(); assertTrue(chunkCount > 0); } @@ -441,16 +434,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkCreator.getPoolSize() == 0); + assertTrue(chunkPool.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkCreator.getPoolSize() > 0); + assertTrue(chunkPool.getPoolSize() > 0); // clear chunks - chunkCreator.clearChunksInPool(); + chunkPool.clearChunks(); // Creating another snapshot @@ -471,7 +464,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkCreator.getPoolSize() > 0); + assertTrue(chunkPool.getPoolSize() > 0); } @Test @@ -523,16 +516,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf1, 3, val), null); assertEquals(3, memstore.getActive().getCellsCount()); - assertTrue(chunkCreator.getPoolSize() == 0); + assertTrue(chunkPool.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkCreator.getPoolSize() > 0); + assertTrue(chunkPool.getPoolSize() > 0); // clear chunks - chunkCreator.clearChunksInPool(); + chunkPool.clearChunks(); // Creating another snapshot @@ -560,7 +553,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkCreator.getPoolSize() > 0); + assertTrue(chunkPool.getPoolSize() > 0); } ////////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 66e107ae656..5a48455eeb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -44,13 +44,17 @@ import java.util.List; public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class); + //private static MemStoreChunkPool chunkPool; + //private HRegion region; + //private RegionServicesForStores regionServicesForStores; + //private HStore store; ////////////////////////////////////////////////////////////////////////////// // Helpers ////////////////////////////////////////////////////////////////////////////// @Override public void tearDown() throws Exception { - chunkCreator.clearChunksInPool(); + chunkPool.clearChunks(); } @Override public void setUp() throws Exception { @@ -404,16 +408,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkCreator.getPoolSize() == 0); + assertTrue(chunkPool.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkCreator.getPoolSize() > 0); + assertTrue(chunkPool.getPoolSize() > 0); // clear chunks - chunkCreator.clearChunksInPool(); + chunkPool.clearChunks(); // Creating another snapshot @@ -434,7 +438,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkCreator.getPoolSize() > 0); + assertTrue(chunkPool.getPoolSize() > 0); } @Test @@ -468,7 +472,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkCreator.getPoolSize(); + int chunkCount = chunkPool.getPoolSize(); assertTrue(chunkCount > 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java index e32036877e8..8e85730a524 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java @@ -164,7 +164,6 @@ public class TestCompactionArchiveConcurrentClose { HRegionFileSystem fs = new WaitingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java index e7fcf18a107..89b23689793 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java @@ -174,7 +174,6 @@ public class TestCompactionArchiveIOException { private HRegion initHRegion(HTableDescriptor htd, HRegionInfo info) throws IOException { Configuration conf = testUtil.getConfiguration(); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName()); Path regionDir = new Path(tableDir, info.getEncodedName()); Path storeDir = new Path(regionDir, htd.getColumnFamilies()[0].getNameAsString()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java index bff5bec207a..7154511e1f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java @@ -104,7 +104,6 @@ public class TestCompactionPolicy { HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); hlog = new FSHLog(fs, basedir, logName, conf); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); region = HRegion.createHRegion(info, basedir, conf, htd, hlog); region.close(); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 41b304b3309..7434eb1d7b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.wal.WALFactory; -import org.junit.AfterClass; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -85,7 +84,6 @@ public class TestDefaultMemStore { protected static final byte[] FAMILY = Bytes.toBytes("column"); protected MultiVersionConcurrencyControl mvcc; protected AtomicLong startSeqNum = new AtomicLong(0); - protected ChunkCreator chunkCreator; private String getName() { return this.name.getMethodName(); @@ -94,17 +92,9 @@ public class TestDefaultMemStore { @Before public void setUp() throws Exception { internalSetUp(); - // no pool - this.chunkCreator = - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); this.memstore = new DefaultMemStore(); } - @AfterClass - public static void tearDownClass() throws Exception { - ChunkCreator.getInstance().clearChunkIds(); - } - protected void internalSetUp() throws Exception { this.mvcc = new MultiVersionConcurrencyControl(); } @@ -139,9 +129,7 @@ public class TestDefaultMemStore { assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize()); // make sure chunk size increased even when writing the same cell, if using MSLAB if (msLab instanceof MemStoreLABImpl) { - // since we add the chunkID at the 0th offset of the chunk and the - // chunkid is a long we need to account for those 8 bytes - assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG, + assertEquals(2 * Segment.getCellLength(kv), ((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset()); } } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 24e850daff5..73fb9cf4c0e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -266,7 +266,6 @@ public class TestFailedAppendAndSync { */ public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) throws IOException { - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index 0f24a24f0c2..b416c7d9e53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -153,7 +153,7 @@ public class TestHMobStore { htd.addFamily(hcd); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); final WALFactory wals = new WALFactory(walConf, null, methodName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 095f4bd27dc..d56d6ecf2ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -116,7 +116,6 @@ import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterAllFilter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.NullComparator; @@ -4932,7 +4931,6 @@ public class TestHRegion { String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) throws IOException { Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey); final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri); return initHRegion(tableName, startKey, stopKey, isReadOnly, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 6eed7df9faa..005464262bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -153,7 +153,7 @@ public class TestHRegionReplayEvents { } time = System.currentTimeMillis(); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + primaryHri = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, time, 0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 1768801a647..37a7664b77b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -48,30 +48,30 @@ import static org.junit.Assert.assertTrue; @Category({RegionServerTests.class, SmallTests.class}) public class TestMemStoreChunkPool { private final static Configuration conf = new Configuration(); - private static ChunkCreator chunkCreator; + private static MemStoreChunkPool chunkPool; private static boolean chunkPoolDisabledBeforeTest; @BeforeClass public static void setUpBeforeClass() throws Exception { conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - chunkPoolDisabledBeforeTest = ChunkCreator.chunkPoolDisabled; - ChunkCreator.chunkPoolDisabled = false; + chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; + MemStoreChunkPool.chunkPoolDisabled = false; long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, - globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); - assertTrue(chunkCreator != null); + chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, + MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); + assertTrue(chunkPool != null); } @AfterClass public static void tearDownAfterClass() throws Exception { - ChunkCreator.chunkPoolDisabled = chunkPoolDisabledBeforeTest; + MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest; } @Before public void tearDown() throws Exception { - chunkCreator.clearChunksInPool(); + chunkPool.clearChunks(); } @Test @@ -90,7 +90,7 @@ public class TestMemStoreChunkPool { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - expectedOff = 8; + expectedOff = 0; lastBuffer = newKv.getBuffer(); } assertEquals(expectedOff, newKv.getOffset()); @@ -100,14 +100,14 @@ public class TestMemStoreChunkPool { } // chunks will be put back to pool after close mslab.close(); - int chunkCount = chunkCreator.getPoolSize(); + int chunkCount = chunkPool.getPoolSize(); assertTrue(chunkCount > 0); // reconstruct mslab mslab = new MemStoreLABImpl(conf); // chunk should be got from the pool, so we can reuse it. KeyValue kv = new KeyValue(rk, cf, q, new byte[10]); mslab.copyCellInto(kv); - assertEquals(chunkCount - 1, chunkCreator.getPoolSize()); + assertEquals(chunkCount - 1, chunkPool.getPoolSize()); } @Test @@ -143,7 +143,7 @@ public class TestMemStoreChunkPool { } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkCreator.getPoolSize(); + int chunkCount = chunkPool.getPoolSize(); assertTrue(chunkCount > 0); } @@ -189,16 +189,16 @@ public class TestMemStoreChunkPool { } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkCreator.getPoolSize() == 0); + assertTrue(chunkPool.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkCreator.getPoolSize() > 0); + assertTrue(chunkPool.getPoolSize() > 0); // clear chunks - chunkCreator.clearChunksInPool(); + chunkPool.clearChunks(); // Creating another snapshot snapshot = memstore.snapshot(); @@ -218,20 +218,20 @@ public class TestMemStoreChunkPool { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkCreator.getPoolSize() > 0); + assertTrue(chunkPool.getPoolSize() > 0); } @Test public void testPutbackChunksMultiThreaded() throws Exception { + MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE; final int maxCount = 10; final int initialCount = 5; - final int chunkSize = 40; + final int chunkSize = 30; final int valSize = 7; - ChunkCreator oldCreator = ChunkCreator.getInstance(); - ChunkCreator newCreator = new ChunkCreator(chunkSize, false, 400, 1, 0.5f, null); - assertEquals(initialCount, newCreator.getPoolSize()); - assertEquals(maxCount, newCreator.getMaxCount()); - ChunkCreator.INSTANCE = newCreator;// Replace the global ref with the new one we created. + MemStoreChunkPool pool = new MemStoreChunkPool(chunkSize, maxCount, initialCount, 1, false); + assertEquals(initialCount, pool.getPoolSize()); + assertEquals(maxCount, pool.getMaxCount()); + MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created. // Used it for the testing. Later in finally we put // back the original final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), @@ -258,9 +258,9 @@ public class TestMemStoreChunkPool { t1.join(); t2.join(); t3.join(); - assertTrue(newCreator.getPoolSize() <= maxCount); + assertTrue(pool.getPoolSize() <= maxCount); } finally { - ChunkCreator.INSTANCE = oldCreator; + MemStoreChunkPool.GLOBAL_INSTANCE = oldPool; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 6696e432f5a..141b8023e2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -63,8 +63,8 @@ public class TestMemStoreLAB { public static void setUpBeforeClass() throws Exception { long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, - 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, + MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); } /** @@ -76,7 +76,6 @@ public class TestMemStoreLAB { MemStoreLAB mslab = new MemStoreLABImpl(); int expectedOff = 0; ByteBuffer lastBuffer = null; - long lastChunkId = -1; // 100K iterations by 0-1K alloc -> 50MB expected // should be reasonable for unit test and also cover wraparound // behavior @@ -86,13 +85,8 @@ public class TestMemStoreLAB { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - // since we add the chunkID at the 0th offset of the chunk and the - // chunkid is a long we need to account for those 8 bytes - expectedOff = Bytes.SIZEOF_LONG; + expectedOff = 0; lastBuffer = newKv.getBuffer(); - long chunkId = newKv.getBuffer().getLong(0); - assertTrue("chunkid should be different", chunkId != lastChunkId); - lastChunkId = chunkId; } assertEquals(expectedOff, newKv.getOffset()); assertTrue("Allocation overruns buffer", @@ -142,21 +136,23 @@ public class TestMemStoreLAB { }; ctx.addThread(t); } - + ctx.startThreads(); while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); + // Partition the allocations by the actual byte[] they point into, // make sure offsets are unique for each chunk Map> mapsByChunk = Maps.newHashMap(); - + int sizeCounted = 0; for (AllocRecord rec : Iterables.concat(allocations)) { sizeCounted += rec.size; if (rec.size == 0) continue; + Map mapForThisByteArray = mapsByChunk.get(rec.alloc); if (mapForThisByteArray == null) { @@ -171,9 +167,7 @@ public class TestMemStoreLAB { // Now check each byte array to make sure allocations don't overlap for (Map allocsInChunk : mapsByChunk.values()) { - // since we add the chunkID at the 0th offset of the chunk and the - // chunkid is a long we need to account for those 8 bytes - int expectedOff = Bytes.SIZEOF_LONG; + int expectedOff = 0; for (AllocRecord alloc : allocsInChunk.values()) { assertEquals(expectedOff, alloc.offset); assertTrue("Allocation overruns buffer", @@ -181,6 +175,7 @@ public class TestMemStoreLAB { expectedOff += alloc.size; } } + } /** @@ -199,7 +194,7 @@ public class TestMemStoreLAB { // set chunk size to default max alloc size, so we could easily trigger chunk retirement conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); // reconstruct mslab - ChunkCreator.clearDisableFlag(); + MemStoreChunkPool.clearDisableFlag(); mslab = new MemStoreLABImpl(conf); // launch multiple threads to trigger frequent chunk retirement List threads = new ArrayList<>(); @@ -228,8 +223,6 @@ public class TestMemStoreLAB { } // close the mslab mslab.close(); - // none of the chunkIds would have been returned back - assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0); // make sure all chunks reclaimed or removed from chunk queue int queueLength = mslab.getPooledChunks().size(); assertTrue("All chunks in chunk queue should be reclaimed or removed" diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java deleted file mode 100644 index f38a75e723d..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.lang.management.ManagementFactory; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ByteBufferKeyValue; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({RegionServerTests.class, SmallTests.class}) -public class TestMemstoreLABWithoutPool { - private final static Configuration conf = new Configuration(); - - private static final byte[] rk = Bytes.toBytes("r1"); - private static final byte[] cf = Bytes.toBytes("f"); - private static final byte[] q = Bytes.toBytes("q"); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() - .getMax() * 0.8); - // disable pool - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT + Bytes.SIZEOF_LONG, false, globalMemStoreLimit, - 0.0f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); - } - - /** - * Test a bunch of random allocations - */ - @Test - public void testLABRandomAllocation() { - Random rand = new Random(); - MemStoreLAB mslab = new MemStoreLABImpl(); - int expectedOff = 0; - ByteBuffer lastBuffer = null; - long lastChunkId = -1; - // 100K iterations by 0-1K alloc -> 50MB expected - // should be reasonable for unit test and also cover wraparound - // behavior - for (int i = 0; i < 100000; i++) { - int valSize = rand.nextInt(1000); - KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); - int size = KeyValueUtil.length(kv); - ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); - if (newKv.getBuffer() != lastBuffer) { - // since we add the chunkID at the 0th offset of the chunk and the - // chunkid is a long we need to account for those 8 bytes - expectedOff = Bytes.SIZEOF_LONG; - lastBuffer = newKv.getBuffer(); - long chunkId = newKv.getBuffer().getLong(0); - assertTrue("chunkid should be different", chunkId != lastChunkId); - lastChunkId = chunkId; - } - assertEquals(expectedOff, newKv.getOffset()); - assertTrue("Allocation overruns buffer", - newKv.getOffset() + size <= newKv.getBuffer().capacity()); - expectedOff += size; - } - } - - /** - * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure - * there's no memory leak (HBASE-16195) - * @throws Exception if any error occurred - */ - @Test - public void testLABChunkQueueWithMultipleMSLABs() throws Exception { - Configuration conf = HBaseConfiguration.create(); - MemStoreLABImpl[] mslab = new MemStoreLABImpl[10]; - for (int i = 0; i < 10; i++) { - mslab[i] = new MemStoreLABImpl(conf); - } - // launch multiple threads to trigger frequent chunk retirement - List threads = new ArrayList<>(); - // create smaller sized kvs - final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), - new byte[0]); - for (int i = 0; i < 10; i++) { - for (int j = 0; j < 10; j++) { - threads.add(getChunkQueueTestThread(mslab[i], "testLABChunkQueue-" + j, kv)); - } - } - for (Thread thread : threads) { - thread.start(); - } - // let it run for some time - Thread.sleep(3000); - for (Thread thread : threads) { - thread.interrupt(); - } - boolean threadsRunning = true; - boolean alive = false; - while (threadsRunning) { - alive = false; - for (Thread thread : threads) { - if (thread.isAlive()) { - alive = true; - break; - } - } - if (!alive) { - threadsRunning = false; - } - } - // close the mslab - for (int i = 0; i < 10; i++) { - mslab[i].close(); - } - // all of the chunkIds would have been returned back - assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0); - } - - private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, - Cell cellToCopyInto) { - Thread thread = new Thread() { - boolean stopped = false; - - @Override - public void run() { - while (!stopped) { - // keep triggering chunk retirement - mslab.copyCellInto(cellToCopyInto); - } - } - - @Override - public void interrupt() { - this.stopped = true; - } - }; - thread.setName(threadName); - thread.setDaemon(true); - return thread; - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java index 7160e5e9cba..4315bd42f42 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java @@ -108,7 +108,6 @@ public class TestRecoveredEdits { } }; Path hbaseRootDir = TEST_UTIL.getDataTestDir(); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); Path tableDir = FSUtils.getTableDir(hbaseRootDir, htd.getTableName()); HRegionFileSystem hrfs = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java index ad56081a6c8..5d11c0ece47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java @@ -83,7 +83,6 @@ public class TestRegionIncrement { private HRegion getRegion(final Configuration conf, final String tableName) throws IOException { WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), TEST_UTIL.getDataTestDir().toString(), conf); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 5355c77fca0..0d339b1ab15 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -178,7 +178,6 @@ public class TestStore { } else { htd.addFamily(hcd); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java index 99dd00db995..3cdb227dbcb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java @@ -111,7 +111,6 @@ public class TestStoreFileRefresherChore { final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); final WALFactory wals = new WALFactory(walConf, null, "log_" + replicaId); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), conf, htd, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 51260a6f71b..4f247b0cb37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -588,7 +588,6 @@ public class TestWALLockup { */ public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) throws IOException { - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java index e63bad91240..994779f6639 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -98,7 +98,6 @@ public class TestWALMonotonicallyIncreasingSeqId { FSUtils.setRootDir(walConf, tableDir); this.walConf = walConf; wals = new WALFactory(walConf, null, "log_" + replicaId); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), conf, htd, null); region.initialize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java index 057b9bfd348..f976b492711 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java @@ -37,9 +37,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -290,7 +288,6 @@ public class TestDurability { throw new IOException("Failed delete of " + path); } } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return HRegion.createHRegion(info, path, CONF, htd, log); }