blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
+ int nameComparison = a.getHfileName().compareTo(b.getHfileName());
+ if (nameComparison != 0) {
+ return nameComparison;
+ }
+ return Long.compare(a.getOffset(), b.getOffset());
+ });
/** Statistics thread schedule pool (for heavy debugging, could remove) */
private transient final ScheduledExecutorService scheduleThreadPool =
@@ -249,16 +237,14 @@ public class BucketCache implements BlockCache, HeapSize {
private float memoryFactor;
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
- int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
- IOException {
+ int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
- persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
+ persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
}
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
- int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
- Configuration conf)
- throws FileNotFoundException, IOException {
+ int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
+ Configuration conf) throws IOException {
this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
this.writerThreads = new WriterThread[writerThreadNum];
long blockNumCapacity = capacity / blockSize;
@@ -444,7 +430,8 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
// Stuff the entry into the RAM cache so it can get drained to the persistent store
RAMQueueEntry re =
- new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
+ new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory,
+ createRecycler(cacheKey));
/**
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
@@ -509,21 +496,16 @@ public class BucketCache implements BlockCache, HeapSize {
// maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
// existence here.
if (bucketEntry.equals(backingMap.get(key))) {
- // TODO : change this area - should be removed after server cells and
- // 12295 are available
- int len = bucketEntry.getLength();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
- }
- Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
- bucketEntry.deserializerReference());
- long timeTaken = System.nanoTime() - start;
+ // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the
+ // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
+ // the same BucketEntry, then all of the three will share the same refCnt.
+ Cacheable cachedBlock = ioEngine.read(bucketEntry);
+ // RPC start to reference, so retain here.
+ cachedBlock.retain();
+ // Update the cache statistics.
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
- cacheStats.ioHit(timeTaken);
- }
- if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
- bucketEntry.incrementRefCountAndGet();
+ cacheStats.ioHit(System.nanoTime() - start);
}
bucketEntry.access(accessCount.incrementAndGet());
if (this.ioErrorStartTime > 0) {
@@ -554,40 +536,58 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
+ /**
+ * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:
+ * 1. Close an HFile, and clear all cached blocks.
+ * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.
+ *
+ * Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try
+ * to evict from backingMap. Here we only need to free the reference from bucket cache by calling
+ * {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this block, block can
+ * only be de-allocated when all of them release the block.
+ *
+ * NOTICE: we need to grab the write offset lock firstly before releasing the reference from
+ * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
+ * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak.
+ * @param cacheKey Block to evict
+ * @return true to indicate whether we've evicted successfully or not.
+ */
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
- return evictBlock(cacheKey, true);
- }
-
- // does not check for the ref count. Just tries to evict it if found in the
- // bucket map
- private boolean forceEvict(BlockCacheKey cacheKey) {
if (!cacheEnabled) {
return false;
}
boolean existed = removeFromRamCache(cacheKey);
- BucketEntry bucketEntry = backingMap.get(cacheKey);
- if (bucketEntry == null) {
+ BucketEntry be = backingMap.get(cacheKey);
+ if (be == null) {
if (existed) {
cacheStats.evicted(0, cacheKey.isPrimary());
- return true;
- } else {
- return false;
}
+ return existed;
+ } else {
+ return be.withWriteLock(offsetLock, be::markAsEvicted);
}
- ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
- try {
- lock.writeLock().lock();
- if (backingMap.remove(cacheKey, bucketEntry)) {
- blockEvicted(cacheKey, bucketEntry, !existed);
- } else {
- return false;
+ }
+
+ private Recycler createRecycler(BlockCacheKey cacheKey) {
+ return () -> {
+ if (!cacheEnabled) {
+ return;
}
- } finally {
- lock.writeLock().unlock();
- }
- cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
- return true;
+ boolean existed = removeFromRamCache(cacheKey);
+ BucketEntry be = backingMap.get(cacheKey);
+ if (be == null && existed) {
+ cacheStats.evicted(0, cacheKey.isPrimary());
+ } else if (be != null) {
+ be.withWriteLock(offsetLock, () -> {
+ if (backingMap.remove(cacheKey, be)) {
+ blockEvicted(cacheKey, be, !existed);
+ cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary());
+ }
+ return null;
+ });
+ }
+ };
}
private boolean removeFromRamCache(BlockCacheKey cacheKey) {
@@ -599,53 +599,6 @@ public class BucketCache implements BlockCache, HeapSize {
});
}
- public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
- if (!cacheEnabled) {
- return false;
- }
- boolean existed = removeFromRamCache(cacheKey);
- BucketEntry bucketEntry = backingMap.get(cacheKey);
- if (bucketEntry == null) {
- if (existed) {
- cacheStats.evicted(0, cacheKey.isPrimary());
- return true;
- } else {
- return false;
- }
- }
- ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
- try {
- lock.writeLock().lock();
- int refCount = bucketEntry.getRefCount();
- if (refCount == 0) {
- if (backingMap.remove(cacheKey, bucketEntry)) {
- blockEvicted(cacheKey, bucketEntry, !existed);
- } else {
- return false;
- }
- } else {
- if(!deletedBlock) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("This block " + cacheKey + " is still referred by " + refCount
- + " readers. Can not be freed now");
- }
- return false;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("This block " + cacheKey + " is still referred by " + refCount
- + " readers. Can not be freed now. Hence will mark this"
- + " for evicting at a later point");
- }
- bucketEntry.markForEvict();
- }
- }
- } finally {
- lock.writeLock().unlock();
- }
- cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
- return true;
- }
-
/*
* Statistics thread. Periodically output cache statistics to the log.
*/
@@ -732,19 +685,17 @@ public class BucketCache implements BlockCache, HeapSize {
if (completelyFreeBucketsNeeded != 0) {
// First we will build a set where the offsets are reference counted, usually
// this set is small around O(Handler Count) unless something else is wrong
- Set inUseBuckets = new HashSet();
- for (BucketEntry entry : backingMap.values()) {
- if (entry.getRefCount() != 0) {
- inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
+ Set inUseBuckets = new HashSet<>();
+ backingMap.forEach((k, be) -> {
+ if (be.isRpcRef()) {
+ inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset()));
}
- }
-
- Set candidateBuckets = bucketAllocator.getLeastFilledBuckets(
- inUseBuckets, completelyFreeBucketsNeeded);
+ });
+ Set candidateBuckets =
+ bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
for (Map.Entry entry : backingMap.entrySet()) {
- if (candidateBuckets.contains(bucketAllocator
- .getBucketIndex(entry.getValue().offset()))) {
- evictBlock(entry.getKey(), false);
+ if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
+ entry.getValue().withWriteLock(offsetLock, entry.getValue()::markStaleAsEvicted);
}
}
}
@@ -921,7 +872,9 @@ public class BucketCache implements BlockCache, HeapSize {
// Blocks
entries = getRAMQueueEntries(inputQueue, entries);
} catch (InterruptedException ie) {
- if (!cacheEnabled) break;
+ if (!cacheEnabled || !writerEnabled) {
+ break;
+ }
}
doDrain(entries);
} catch (Exception ioe) {
@@ -949,13 +902,10 @@ public class BucketCache implements BlockCache, HeapSize {
private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
if (previousEntry != null && previousEntry != bucketEntry) {
- ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset());
- lock.writeLock().lock();
- try {
+ previousEntry.withWriteLock(offsetLock, () -> {
blockEvicted(key, previousEntry, false);
- } finally {
- lock.writeLock().unlock();
- }
+ return null;
+ });
}
}
@@ -1049,22 +999,13 @@ public class BucketCache implements BlockCache, HeapSize {
});
if (!existed && bucketEntries[i] != null) {
// Block should have already been evicted. Remove it and free space.
- ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
- try {
- lock.writeLock().lock();
- int refCount = bucketEntries[i].getRefCount();
- if (refCount == 0) {
- if (backingMap.remove(key, bucketEntries[i])) {
- blockEvicted(key, bucketEntries[i], false);
- } else {
- bucketEntries[i].markForEvict();
- }
- } else {
- bucketEntries[i].markForEvict();
+ final BucketEntry bucketEntry = bucketEntries[i];
+ bucketEntry.withWriteLock(offsetLock, () -> {
+ if (backingMap.remove(key, bucketEntry)) {
+ blockEvicted(key, bucketEntry, false);
}
- } finally {
- lock.writeLock().unlock();
- }
+ return null;
+ });
}
}
@@ -1077,17 +1018,16 @@ public class BucketCache implements BlockCache, HeapSize {
}
/**
- * Blocks until elements available in {@code q} then tries to grab as many as possible
- * before returning.
- * @param receptacle Where to stash the elements taken from queue. We clear before we use it
- * just in case.
+ * Blocks until elements available in {@code q} then tries to grab as many as possible before
+ * returning.
+ * @param receptacle Where to stash the elements taken from queue. We clear before we use it just
+ * in case.
* @param q The queue to take from.
* @return {@code receptacle} laden with elements taken from the queue or empty if none found.
*/
@VisibleForTesting
- static List getRAMQueueEntries(final BlockingQueue q,
- final List receptacle)
- throws InterruptedException {
+ static List getRAMQueueEntries(BlockingQueue q,
+ List receptacle) throws InterruptedException {
// Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
// ok even if list grew to accommodate thousands.
receptacle.clear();
@@ -1311,155 +1251,6 @@ public class BucketCache implements BlockCache, HeapSize {
return numEvicted;
}
- /**
- * Item in cache. We expect this to be where most memory goes. Java uses 8
- * bytes just for object headers; after this, we want to use as little as
- * possible - so we only use 8 bytes, but in order to do so we end up messing
- * around with all this Java casting stuff. Offset stored as 5 bytes that make
- * up the long. Doubt we'll see devices this big for ages. Offsets are divided
- * by 256. So 5 bytes gives us 256TB or so.
- */
- static class BucketEntry implements Serializable {
- private static final long serialVersionUID = -6741504807982257534L;
-
- // access counter comparator, descending order
- static final Comparator COMPARATOR = Comparator
- .comparingLong(BucketEntry::getAccessCounter).reversed();
-
- private int offsetBase;
- private int length;
- private byte offset1;
-
- /**
- * The index of the deserializer that can deserialize this BucketEntry content.
- * See {@link CacheableDeserializerIdManager} for hosting of index to serializers.
- */
- byte deserialiserIndex;
-
- private volatile long accessCounter;
- private BlockPriority priority;
-
- /**
- * Time this block was cached. Presumes we are created just before we are added to the cache.
- */
- private final long cachedTime = System.nanoTime();
-
- BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
- setOffset(offset);
- this.length = length;
- this.accessCounter = accessCounter;
- if (inMemory) {
- this.priority = BlockPriority.MEMORY;
- } else {
- this.priority = BlockPriority.SINGLE;
- }
- }
-
- long offset() { // Java has no unsigned numbers
- long o = ((long) offsetBase) & 0xFFFFFFFFL; //This needs the L cast otherwise it will be sign extended as a negative number.
- o += (((long) (offset1)) & 0xFF) << 32; //The 0xFF here does not need the L cast because it is treated as a positive int.
- return o << 8;
- }
-
- private void setOffset(long value) {
- assert (value & 0xFF) == 0;
- value >>= 8;
- offsetBase = (int) value;
- offset1 = (byte) (value >> 32);
- }
-
- public int getLength() {
- return length;
- }
-
- protected CacheableDeserializer deserializerReference() {
- return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex);
- }
-
- protected void setDeserialiserReference(CacheableDeserializer deserializer) {
- this.deserialiserIndex = (byte) deserializer.getDeserialiserIdentifier();
- }
-
- public long getAccessCounter() {
- return accessCounter;
- }
-
- /**
- * Block has been accessed. Update its local access counter.
- */
- public void access(long accessCounter) {
- this.accessCounter = accessCounter;
- if (this.priority == BlockPriority.SINGLE) {
- this.priority = BlockPriority.MULTI;
- }
- }
-
- public BlockPriority getPriority() {
- return this.priority;
- }
-
- public long getCachedTime() {
- return cachedTime;
- }
-
- protected int getRefCount() {
- return 0;
- }
-
- protected int incrementRefCountAndGet() {
- return 0;
- }
-
- protected int decrementRefCountAndGet() {
- return 0;
- }
-
- protected boolean isMarkedForEvict() {
- return false;
- }
-
- protected void markForEvict() {
- // noop;
- }
- }
-
- static class SharedMemoryBucketEntry extends BucketEntry {
- private static final long serialVersionUID = -2187147283772338481L;
-
- // Set this when we were not able to forcefully evict the block
- private volatile boolean markedForEvict;
- private AtomicInteger refCount = new AtomicInteger(0);
-
- SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
- super(offset, length, accessCounter, inMemory);
- }
-
- @Override
- protected int getRefCount() {
- return this.refCount.get();
- }
-
- @Override
- protected int incrementRefCountAndGet() {
- return this.refCount.incrementAndGet();
- }
-
- @Override
- protected int decrementRefCountAndGet() {
- return this.refCount.decrementAndGet();
- }
-
- @Override
- protected boolean isMarkedForEvict() {
- return this.markedForEvict;
- }
-
- @Override
- protected void markForEvict() {
- this.markedForEvict = true;
- }
- }
-
/**
* Used to group bucket entries into priority buckets. There will be a
* BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
@@ -1489,8 +1280,9 @@ public class BucketCache implements BlockCache, HeapSize {
// TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
// What to do then? Caching attempt fail? Need some changes in cacheBlock API?
while ((entry = queue.pollLast()) != null) {
- if (evictBlock(entry.getKey(), false)) {
- freedBytes += entry.getValue().getLength();
+ BucketEntry be = entry.getValue();
+ if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) {
+ freedBytes += be.getLength();
}
if (freedBytes >= toFree) {
return freedBytes;
@@ -1513,17 +1305,19 @@ public class BucketCache implements BlockCache, HeapSize {
*/
@VisibleForTesting
static class RAMQueueEntry {
- private BlockCacheKey key;
- private Cacheable data;
+ private final BlockCacheKey key;
+ private final Cacheable data;
private long accessCounter;
private boolean inMemory;
+ private final Recycler recycler;
- public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
- boolean inMemory) {
+ RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory,
+ Recycler recycler) {
this.key = bck;
this.data = data;
this.accessCounter = accessCounter;
this.inMemory = inMemory;
+ this.recycler = recycler;
}
public Cacheable getData() {
@@ -1538,31 +1332,19 @@ public class BucketCache implements BlockCache, HeapSize {
this.accessCounter = accessCounter;
}
- private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) {
- if (ioEngine.usesSharedMemory()) {
- if (UnsafeAvailChecker.isAvailable()) {
- return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
- } else {
- return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
- }
- } else {
- return new BucketEntry(offset, len, accessCounter, inMemory);
- }
- }
-
- public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator,
- final LongAdder realCacheSize)
- throws IOException {
+ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
+ final LongAdder realCacheSize) throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
return null;
}
- long offset = bucketAllocator.allocateBlock(len);
+ long offset = alloc.allocateBlock(len);
boolean succ = false;
- BucketEntry bucketEntry;
+ BucketEntry bucketEntry = null;
try {
- bucketEntry = getBucketEntry(ioEngine, offset, len);
+ bucketEntry =
+ new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler));
bucketEntry.setDeserialiserReference(data.getDeserializer());
if (data instanceof HFileBlock) {
// If an instance of HFileBlock, save on some allocations.
@@ -1580,7 +1362,7 @@ public class BucketCache implements BlockCache, HeapSize {
succ = true;
} finally {
if (!succ) {
- bucketAllocator.freeBlock(offset);
+ alloc.freeBlock(offset);
}
}
realCacheSize.add(len);
@@ -1696,25 +1478,11 @@ public class BucketCache implements BlockCache, HeapSize {
return null;
}
- @Override
- public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
- block.release();
- if (block.getMemoryType() == MemoryType.SHARED) {
- BucketEntry bucketEntry = backingMap.get(cacheKey);
- if (bucketEntry != null) {
- int refCount = bucketEntry.decrementRefCountAndGet();
- if (refCount == 0 && bucketEntry.isMarkedForEvict()) {
- evictBlock(cacheKey);
- }
- }
- }
- }
-
@VisibleForTesting
- public int getRefCount(BlockCacheKey cacheKey) {
+ public int getRpcRefCount(BlockCacheKey cacheKey) {
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry != null) {
- return bucketEntry.getRefCount();
+ return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1);
}
return 0;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
new file mode 100644
index 00000000000..b6e83d5d0b9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
@@ -0,0 +1,239 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hbase.io.hfile.BlockPriority;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
+import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
+import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.HBaseReferenceCounted;
+import org.apache.hadoop.hbase.nio.RefCnt;
+import org.apache.hadoop.hbase.util.IdReadWriteLock;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Item in cache. We expect this to be where most memory goes. Java uses 8 bytes just for object
+ * headers; after this, we want to use as little as possible - so we only use 8 bytes, but in order
+ * to do so we end up messing around with all this Java casting stuff. Offset stored as 5 bytes that
+ * make up the long. Doubt we'll see devices this big for ages. Offsets are divided by 256. So 5
+ * bytes gives us 256TB or so.
+ */
+@InterfaceAudience.Private
+class BucketEntry implements HBaseReferenceCounted {
+ // access counter comparator, descending order
+ static final Comparator COMPARATOR =
+ Comparator.comparingLong(BucketEntry::getAccessCounter).reversed();
+
+ private int offsetBase;
+ private int length;
+ private byte offset1;
+
+ /**
+ * The index of the deserializer that can deserialize this BucketEntry content. See
+ * {@link CacheableDeserializerIdManager} for hosting of index to serializers.
+ */
+ byte deserialiserIndex;
+
+ private volatile long accessCounter;
+ private BlockPriority priority;
+
+ /**
+ * The RefCnt means how many paths are referring the {@link BucketEntry}, each RPC reading path is
+ * considering as one path, the {@link BucketCache#backingMap} reference is also considered a
+ * path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, then the HFileBlocks
+ * the two RPC referred will share the same refCnt instance with the BucketEntry. so the refCnt
+ * will increase or decrease as the following:
+ * 1. when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, the
+ * refCnt ++;
+ * 2. If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; it
+ * usually happen when HFile is closing or someone call the clearBucketCache by force.
+ * 3. The read RPC path start to refer the block which is backend by the memory area in
+ * bucketEntry, then refCnt ++ ;
+ * 4. The read RPC patch shipped the response, and release the block. then refCnt--;
+ * Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
+ */
+ private final RefCnt refCnt;
+ final AtomicBoolean markedAsEvicted;
+
+ /**
+ * Time this block was cached. Presumes we are created just before we are added to the cache.
+ */
+ private final long cachedTime = System.nanoTime();
+
+ BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
+ this(offset, length, accessCounter, inMemory, RefCnt.create());
+ }
+
+ BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt) {
+ setOffset(offset);
+ this.length = length;
+ this.accessCounter = accessCounter;
+ this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
+ this.refCnt = refCnt;
+ this.markedAsEvicted = new AtomicBoolean(false);
+ }
+
+ long offset() {
+ // Java has no unsigned numbers, so this needs the L cast otherwise it will be sign extended
+ // as a negative number.
+ long o = ((long) offsetBase) & 0xFFFFFFFFL;
+ // The 0xFF here does not need the L cast because it is treated as a positive int.
+ o += (((long) (offset1)) & 0xFF) << 32;
+ return o << 8;
+ }
+
+ private void setOffset(long value) {
+ assert (value & 0xFF) == 0;
+ value >>= 8;
+ offsetBase = (int) value;
+ offset1 = (byte) (value >> 32);
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ CacheableDeserializer deserializerReference() {
+ return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex);
+ }
+
+ void setDeserialiserReference(CacheableDeserializer deserializer) {
+ this.deserialiserIndex = (byte) deserializer.getDeserialiserIdentifier();
+ }
+
+ long getAccessCounter() {
+ return accessCounter;
+ }
+
+ /**
+ * Block has been accessed. Update its local access counter.
+ */
+ void access(long accessCounter) {
+ this.accessCounter = accessCounter;
+ if (this.priority == BlockPriority.SINGLE) {
+ this.priority = BlockPriority.MULTI;
+ }
+ }
+
+ public BlockPriority getPriority() {
+ return this.priority;
+ }
+
+ long getCachedTime() {
+ return cachedTime;
+ }
+
+ /**
+ * The {@link BucketCache} will try to release its reference to this BucketEntry many times. we
+ * must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance,
+ * then for RPC memory leak happen.
+ * @return true if we deallocate this entry successfully.
+ */
+ boolean markAsEvicted() {
+ if (markedAsEvicted.compareAndSet(false, true)) {
+ return this.release();
+ }
+ return false;
+ }
+
+ /**
+ * Mark as evicted only when NO RPC references. Mainly used for eviction when cache size exceed
+ * the max acceptable size.
+ * @return true if we deallocate this entry successfully.
+ */
+ boolean markStaleAsEvicted() {
+ if (!markedAsEvicted.get() && this.refCnt() == 1) {
+ // The only reference was coming from backingMap, now release the stale entry.
+ return this.markAsEvicted();
+ }
+ return false;
+ }
+
+ /**
+ * Check whether have some RPC patch referring this block. There're two case:
+ * 1. If current refCnt is greater than 1, there must be at least one referring RPC path;
+ * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has
+ * released its reference, the remaining reference can only be from RPC path.
+ * We use this check to decide whether we can free the block area: when cached size exceed the
+ * acceptable size, our eviction policy will choose those stale blocks without any RPC reference
+ * and the RPC referred block will be excluded.
+ * @return true to indicate there're some RPC referring the block.
+ */
+ boolean isRpcRef() {
+ boolean evicted = markedAsEvicted.get();
+ return this.refCnt() > 1 || (evicted && refCnt() == 1);
+ }
+
+ Cacheable wrapAsCacheable(ByteBuffer[] buffers, MemoryType memoryType) throws IOException {
+ ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
+ return this.deserializerReference().deserialize(buf, true, memoryType);
+ }
+
+ interface BucketEntryHandler {
+ T handle();
+ }
+
+ T withWriteLock(IdReadWriteLock offsetLock, BucketEntryHandler handler) {
+ ReentrantReadWriteLock lock = offsetLock.getLock(this.offset());
+ try {
+ lock.writeLock().lock();
+ return handler.handle();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public int refCnt() {
+ return this.refCnt.refCnt();
+ }
+
+ @Override
+ public BucketEntry retain() {
+ refCnt.retain();
+ return this;
+ }
+
+ /**
+ * We've three cases to release refCnt now:
+ * 1. BucketCache#evictBlock, it will release the backingMap's reference by force because we're
+ * closing file or clear the bucket cache or some corruption happen. when all rpc references gone,
+ * then free the area in bucketAllocator.
+ * 2. BucketCache#returnBlock . when rpc shipped, we'll release the block, only when backingMap
+ * also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the
+ * area in bucketAllocator.
+ * 3.evict those block without any rpc reference if cache size exceeded. we'll only free those
+ * blocks with zero rpc reference count, as the {@link BucketEntry#markStaleAsEvicted()} do.
+ * @return true to indicate we've decreased to zero and do the de-allocation.
+ */
+ @Override
+ public boolean release() {
+ return refCnt.release();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
index 35daff7cdf2..72765de3547 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
@@ -50,9 +50,9 @@ final class BucketProtoUtils {
}
private static BucketCacheProtos.BackingMap toPB(
- Map backingMap) {
+ Map backingMap) {
BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
- for (Map.Entry entry : backingMap.entrySet()) {
+ for (Map.Entry entry : backingMap.entrySet()) {
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder()
.setKey(toPB(entry.getKey()))
.setValue(toPB(entry.getValue()))
@@ -101,7 +101,7 @@ final class BucketProtoUtils {
}
}
- private static BucketCacheProtos.BucketEntry toPB(BucketCache.BucketEntry entry) {
+ private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) {
return BucketCacheProtos.BucketEntry.newBuilder()
.setOffset(entry.offset())
.setLength(entry.getLength())
@@ -124,16 +124,16 @@ final class BucketProtoUtils {
}
}
- static ConcurrentHashMap fromPB(
+ static ConcurrentHashMap fromPB(
Map deserializers, BucketCacheProtos.BackingMap backingMap)
throws IOException {
- ConcurrentHashMap result = new ConcurrentHashMap<>();
+ ConcurrentHashMap result = new ConcurrentHashMap<>();
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
BucketCacheProtos.BucketEntry protoValue = entry.getValue();
- BucketCache.BucketEntry value = new BucketCache.BucketEntry(
+ BucketEntry value = new BucketEntry(
protoValue.getOffset(),
protoValue.getLength(),
protoValue.getAccessCounter(),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
index fa8b1848cab..4e1b9139183 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
-import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferAllocator;
@@ -100,16 +99,15 @@ public class ByteBufferIOEngine implements IOEngine {
}
@Override
- public Cacheable read(long offset, int length, CacheableDeserializer deserializer)
- throws IOException {
- ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length);
+ public Cacheable read(BucketEntry be) throws IOException {
+ ByteBuffer[] buffers = bufferArray.asSubByteBuffers(be.offset(), be.getLength());
// Here the buffer that is created directly refers to the buffer in the actual buckets.
// When any cell is referring to the blocks created out of these buckets then it means that
// those cells are referring to a shared memory area which if evicted by the BucketCache would
// lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY
// so that the readers using this block are aware of this fact and do the necessary action
// to prevent eviction till the results are either consumed or copied
- return deserializer.deserialize(dstBuffer, true, MemoryType.SHARED);
+ return be.wrapAsCacheable(buffers, MemoryType.SHARED);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
index 29721ab6593..d8c677c8fb9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
@@ -21,11 +21,9 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
import java.util.Comparator;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
-import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
@@ -43,6 +41,9 @@ import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue
@InterfaceAudience.Private
public class CachedEntryQueue {
+ private static final Comparator> COMPARATOR =
+ (a, b) -> BucketEntry.COMPARATOR.compare(a.getValue(), b.getValue());
+
private MinMaxPriorityQueue> queue;
private long cacheSize;
@@ -57,15 +58,7 @@ public class CachedEntryQueue {
if (initialSize == 0) {
initialSize++;
}
- queue = MinMaxPriorityQueue.orderedBy(new Comparator>() {
-
- @Override
- public int compare(Entry entry1,
- Entry entry2) {
- return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue());
- }
-
- }).expectedSize(initialSize).create();
+ queue = MinMaxPriorityQueue.orderedBy(COMPARATOR).expectedSize(initialSize).create();
cacheSize = 0;
this.maxSize = maxSize;
}
@@ -112,12 +105,4 @@ public class CachedEntryQueue {
public Map.Entry pollLast() {
return queue.pollLast();
}
-
- /**
- * Total size of all elements in this queue.
- * @return size of all elements currently in queue, in bytes
- */
- public long cacheSize() {
- return cacheSize;
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
index b8e29c62757..af749d7fc10 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
@@ -16,19 +16,16 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
-import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
-
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
-import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * IO engine that stores data to a file on the local block device using memory mapping
- * mechanism
+ * IO engine that stores data to a file on the local block device using memory mapping mechanism
*/
@InterfaceAudience.Private
public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
@@ -38,10 +35,10 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
}
@Override
- public Cacheable read(long offset, int length, CacheableDeserializer deserializer)
- throws IOException {
- ByteBuff dst = HEAP.allocate(length);
- bufferArray.read(offset, dst);
- return deserializer.deserialize(dst.position(0).limit(length), true, MemoryType.EXCLUSIVE);
+ public Cacheable read(BucketEntry be) throws IOException {
+ ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
+ bufferArray.read(be.offset(), dst);
+ dst.position(0).limit(be.getLength());
+ return be.wrapAsCacheable(dst.nioByteBuffers(), MemoryType.EXCLUSIVE);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index f6e49cf44ee..f5ab309b3ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -27,11 +27,11 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
-import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -121,30 +121,29 @@ public class FileIOEngine implements IOEngine {
/**
* Transfers data from file to the given byte buffer
- * @param offset The offset in the file where the first byte to be read
- * @param length The length of buffer that should be allocated for reading
- * from the file channel
- * @return number of bytes read
- * @throws IOException
+ * @param be an {@link BucketEntry} which maintains an (offset, len, refCnt)
+ * @return the {@link Cacheable} with block data inside.
+ * @throws IOException if any IO error happen.
*/
@Override
- public Cacheable read(long offset, int length, CacheableDeserializer deserializer)
- throws IOException {
+ public Cacheable read(BucketEntry be) throws IOException {
+ long offset = be.offset();
+ int length = be.getLength();
Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
if (length != 0) {
accessFile(readAccessor, dstBuffer, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
- // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
- // this buffer from the file the data is already copied and there is no need to ensure that
- // the results are not corrupted before consuming them.
+ // Hence in this case there is no shared memory that we point to. Even if the BucketCache
+ // evicts this buffer from the file the data is already copied and there is no need to
+ // ensure that the results are not corrupted before consuming them.
if (dstBuffer.limit() != length) {
- throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
- + " expected");
+ throw new IllegalArgumentIOException(
+ "Only " + dstBuffer.limit() + " bytes read, " + length + " expected");
}
}
dstBuffer.rewind();
- return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
+ return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer }, MemoryType.EXCLUSIVE);
}
@VisibleForTesting
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java
index bd17fd52a33..ee37e916316 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java
@@ -24,7 +24,6 @@ import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
-import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferAllocator;
import org.apache.hadoop.hbase.util.ByteBufferArray;
@@ -101,8 +100,7 @@ public abstract class FileMmapIOEngine implements IOEngine {
}
@Override
- public abstract Cacheable read(long offset, int length,
- CacheableDeserializer deserializer) throws IOException;
+ public abstract Cacheable read(BucketEntry be) throws IOException;
/**
* Transfers data from the given byte buffer to file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
index 87f71a52213..3ffb57ebcf0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
-import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
/**
@@ -48,15 +47,12 @@ public interface IOEngine {
/**
* Transfers data from IOEngine to a Cacheable object.
- * @param length How many bytes to be read from the offset
- * @param offset The offset in the IO engine where the first byte to be read
- * @param deserializer The deserializer to be used to make a Cacheable from the data.
- * @return Cacheable
- * @throws IOException
- * @throws RuntimeException when the length of the ByteBuff read is less than 'len'
+ * @param be maintains an (offset,len,refCnt) inside.
+ * @return Cacheable which will wrap the NIO ByteBuffers from IOEngine.
+ * @throws IOException when any IO error happen
+ * @throws IllegalArgumentException when the length of the ByteBuff read is less than 'len'
*/
- Cacheable read(long offset, int length, CacheableDeserializer deserializer)
- throws IOException;
+ Cacheable read(BucketEntry be) throws IOException;
/**
* Transfers data from the given byte buffer to IOEngine
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/SharedMemoryMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/SharedMemoryMmapIOEngine.java
index b6a7a571f4f..bd83dd4a044 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/SharedMemoryMmapIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/SharedMemoryMmapIOEngine.java
@@ -18,11 +18,10 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
-import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
-import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -50,15 +49,14 @@ public class SharedMemoryMmapIOEngine extends FileMmapIOEngine {
}
@Override
- public Cacheable read(long offset, int length, CacheableDeserializer deserializer)
- throws IOException {
- ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length);
+ public Cacheable read(BucketEntry be) throws IOException {
+ ByteBuffer[] buffers = bufferArray.asSubByteBuffers(be.offset(), be.getLength());
// Here the buffer that is created directly refers to the buffer in the actual buckets.
// When any cell is referring to the blocks created out of these buckets then it means that
// those cells are referring to a shared memory area which if evicted by the BucketCache would
// lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY
// so that the readers using this block are aware of this fact and do the necessary action
// to prevent eviction till the results are either consumed or copied
- return deserializer.deserialize(dstBuffer, true, MemoryType.SHARED);
+ return be.wrapAsCacheable(buffers, MemoryType.SHARED);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java
deleted file mode 100644
index 5d93e9715dc..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io.hfile.bucket;
-
-import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
-import org.apache.hadoop.hbase.util.UnsafeAccess;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import sun.misc.Unsafe;
-
-@InterfaceAudience.Private
-public class UnsafeSharedMemoryBucketEntry extends BucketEntry {
- private static final long serialVersionUID = 707544024564058801L;
-
- // We are just doing what AtomicInteger doing for the Atomic incrementAndGet/decrementAndGet.
- // We are avoiding the need to have a field of AtomicIneger type and have it as just int type.
- // We would like to reduce the head overhead per object of this type as much as possible.
- // Doing this direct Unsafe usage save us 16 bytes per Object.
- // ie Just using 4 bytes for int type than 20 bytes requirement for an AtomicInteger (16 bytes)
- // and 4 bytes reference to it.
- private static final Unsafe unsafe = UnsafeAccess.theUnsafe;
- private static final long refCountOffset;
-
- static {
- try {
- refCountOffset = unsafe
- .objectFieldOffset(UnsafeSharedMemoryBucketEntry.class.getDeclaredField("refCount"));
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
-
- // Set this when we were not able to forcefully evict the block
- private volatile boolean markedForEvict;
- private volatile int refCount = 0;
-
- public UnsafeSharedMemoryBucketEntry(long offset, int length, long accessCounter,
- boolean inMemory) {
- super(offset, length, accessCounter, inMemory);
- }
-
- @Override
- protected int getRefCount() {
- return this.refCount;
- }
-
- @Override
- protected int incrementRefCountAndGet() {
- return unsafe.getAndAddInt(this, refCountOffset, 1) + 1;
- }
-
- @Override
- protected int decrementRefCountAndGet() {
- return unsafe.getAndAddInt(this, refCountOffset, -1) - 1;
- }
-
- @Override
- protected boolean isMarkedForEvict() {
- return this.markedForEvict;
- }
-
- @Override
- protected void markForEvict() {
- this.markedForEvict = true;
- }
-}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
index 5d852efa781..91bdcb7f9b7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -441,9 +440,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -536,9 +535,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -670,9 +669,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -758,9 +757,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -925,9 +924,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -952,9 +951,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -1043,9 +1042,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -1079,9 +1078,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -1160,9 +1159,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -1186,9 +1185,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -1214,9 +1213,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -1293,9 +1292,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
- refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
- refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@@ -1562,8 +1561,6 @@ public class TestBlockEvictionFromClient {
}
public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
- static final AtomicLong sleepTime = new AtomicLong(0);
- static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
static final AtomicInteger countOfNext = new AtomicInteger(0);
static final AtomicInteger countOfGets = new AtomicInteger(0);
static final AtomicBoolean waitForGets = new AtomicBoolean(false);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index 6d6f2a720f1..97003e0284c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -170,16 +170,15 @@ public class CacheTestUtils {
}
- public static void hammerSingleKey(final BlockCache toBeTested,
- int BlockSize, int numThreads, int numQueries) throws Exception {
+ public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries)
+ throws Exception {
final BlockCacheKey key = new BlockCacheKey("key", 0);
final byte[] buf = new byte[5 * 1024];
Arrays.fill(buf, (byte) 5);
final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
Configuration conf = new Configuration();
- MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
- conf);
+ MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
final AtomicInteger totalQueries = new AtomicInteger();
toBeTested.cacheBlock(key, bac);
@@ -188,8 +187,8 @@ public class CacheTestUtils {
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
- ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
- .getBlock(key, false, false, true);
+ ByteArrayCacheable returned =
+ (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true);
if (returned != null) {
assertArrayEquals(buf, returned.buf);
} else {
@@ -223,52 +222,6 @@ public class CacheTestUtils {
ctx.stop();
}
- public static void hammerEviction(final BlockCache toBeTested, int BlockSize,
- int numThreads, int numQueries) throws Exception {
-
- Configuration conf = new Configuration();
- MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
- conf);
-
- final AtomicInteger totalQueries = new AtomicInteger();
-
- for (int i = 0; i < numThreads; i++) {
- final int finalI = i;
-
- final byte[] buf = new byte[5 * 1024];
- TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
- @Override
- public void doAnAction() throws Exception {
- for (int j = 0; j < 100; j++) {
- BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0);
- Arrays.fill(buf, (byte) (finalI * j));
- final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
-
- ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested
- .getBlock(key, true, false, true);
- if (gotBack != null) {
- assertArrayEquals(gotBack.buf, bac.buf);
- } else {
- toBeTested.cacheBlock(key, bac);
- }
- }
- totalQueries.incrementAndGet();
- }
- };
-
- t.setDaemon(true);
- ctx.addThread(t);
- }
-
- ctx.startThreads();
- while (totalQueries.get() < numQueries && ctx.shouldRun()) {
- Thread.sleep(10);
- }
- ctx.stop();
-
- assertTrue(toBeTested.getStats().getEvictedCount() > 0);
- }
-
public static class ByteArrayCacheable implements Cacheable {
static final CacheableDeserializer blockDeserializer =
@@ -405,8 +358,14 @@ public class CacheTestUtils {
destBuffer.clear();
cache.cacheBlock(key, blockToCache);
Cacheable actualBlock = cache.getBlock(key, false, false, false);
- actualBlock.serialize(destBuffer, true);
- assertEquals(expectedBuffer, destBuffer);
- cache.returnBlock(key, actualBlock);
+ try {
+ actualBlock.serialize(destBuffer, true);
+ assertEquals(expectedBuffer, destBuffer);
+ } finally {
+ // Release the reference count increased by getBlock.
+ if (actualBlock != null) {
+ actualBlock.release();
+ }
+ }
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 1029a7777e0..121e0703ffe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -194,7 +194,7 @@ public class TestBucketCache {
@Test
public void testCacheMultiThreadedSingleKey() throws Exception {
- CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * NUM_QUERIES);
+ CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
}
@Test
@@ -208,6 +208,7 @@ public class TestBucketCache {
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
Thread.sleep(100);
}
+ Thread.sleep(1000);
}
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
@@ -221,29 +222,28 @@ public class TestBucketCache {
@Test
public void testMemoryLeak() throws Exception {
final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
- cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
- new byte[10]));
+ cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
+ new CacheTestUtils.ByteArrayCacheable(new byte[10]));
long lockId = cache.backingMap.get(cacheKey).offset();
ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
lock.writeLock().lock();
Thread evictThread = new Thread("evict-block") {
-
@Override
public void run() {
cache.evictBlock(cacheKey);
}
-
};
evictThread.start();
cache.offsetLock.waitForWaiters(lockId, 1);
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
- cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
- new byte[10]));
+ assertEquals(0, cache.getBlockCount());
+ cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
+ new CacheTestUtils.ByteArrayCacheable(new byte[10]));
+ assertEquals(1, cache.getBlockCount());
lock.writeLock().unlock();
evictThread.join();
- assertEquals(1L, cache.getBlockCount());
- assertTrue(cache.getCurrentSize() > 0L);
- assertTrue("We should have a block!", cache.iterator().hasNext());
+ assertEquals(0, cache.getBlockCount());
+ assertEquals(cache.getCurrentSize(), 0L);
}
@Test
@@ -416,10 +416,10 @@ public class TestBucketCache {
@Test
public void testOffsetProducesPositiveOutput() {
- //This number is picked because it produces negative output if the values isn't ensured to be positive.
- //See HBASE-18757 for more information.
+ // This number is picked because it produces negative output if the values isn't ensured to be
+ // positive. See HBASE-18757 for more information.
long testValue = 549888460800L;
- BucketCache.BucketEntry bucketEntry = new BucketCache.BucketEntry(testValue, 10, 10L, true);
+ BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true);
assertEquals(testValue, bucketEntry.offset());
}
@@ -427,16 +427,15 @@ public class TestBucketCache {
public void testCacheBlockNextBlockMetadataMissing() throws Exception {
int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
- byte[] byteArr = new byte[length];
- ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
+ ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
HFileContext meta = new HFileContextBuilder().build();
ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
- HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
+ HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf1,
HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
- HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
+ HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf2,
HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
- BlockCacheKey key = new BlockCacheKey("key1", 0);
+ BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
ByteBuffer actualBuffer = ByteBuffer.allocate(length);
ByteBuffer block1Buffer = ByteBuffer.allocate(length);
ByteBuffer block2Buffer = ByteBuffer.allocate(length);
@@ -448,6 +447,8 @@ public class TestBucketCache {
block1Buffer);
waitUntilFlushedToBucket(cache, key);
+ assertNotNull(cache.backingMap.get(key));
+ assertEquals(1, cache.backingMap.get(key).refCnt());
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
@@ -456,9 +457,10 @@ public class TestBucketCache {
block1Buffer);
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
+ assertEquals(1, cache.backingMap.get(key).refCnt());
// Clear and add blockWithoutNextBlockMetadata
- cache.evictBlock(key);
+ assertTrue(cache.evictBlock(key));
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
@@ -494,8 +496,8 @@ public class TestBucketCache {
-1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-1, -1, -1, meta, ByteBuffAllocator.HEAP);
- RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
- RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
+ RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
+ RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
assertFalse(cache.containsKey(key1));
assertNull(cache.putIfAbsent(key1, re1));
@@ -542,7 +544,7 @@ public class TestBucketCache {
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
- RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
+ RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE);
Assert.assertEquals(0, allocator.getUsedSize());
try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
new file mode 100644
index 00000000000..1dcd2a29c42
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, MediumTests.class })
+public class TestBucketCacheRefCnt {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBucketCacheRefCnt.class);
+
+ private static final String IO_ENGINE = "offheap";
+ private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
+ private static final int BLOCK_SIZE = 1024;
+ private static final int[] BLOCK_SIZE_ARRAY =
+ new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
+ private static final String PERSISTENCE_PATH = null;
+ private static final HFileContext CONTEXT = new HFileContextBuilder().build();
+
+ private BucketCache cache;
+
+ private static BucketCache create(int writerSize, int queueSize) throws IOException {
+ return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
+ queueSize, PERSISTENCE_PATH);
+ }
+
+ private static HFileBlock createBlock(int offset, int size) {
+ return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuffer.allocate(size),
+ HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, HEAP);
+ }
+
+ private static BlockCacheKey createKey(String hfileName, long offset) {
+ return new BlockCacheKey(hfileName, offset);
+ }
+
+ private void disableWriter() {
+ if (cache != null) {
+ for (WriterThread wt : cache.writerThreads) {
+ wt.disableWriter();
+ wt.interrupt();
+ }
+ }
+ }
+
+ @Test
+ public void testBlockInRAMCache() throws IOException {
+ cache = create(1, 1000);
+ disableWriter();
+ try {
+ for (int i = 0; i < 10; i++) {
+ HFileBlock blk = createBlock(i, 1020);
+ BlockCacheKey key = createKey("testHFile-00", i);
+ assertEquals(1, blk.refCnt());
+ cache.cacheBlock(key, blk);
+ assertEquals(i + 1, cache.getBlockCount());
+ assertEquals(2, blk.refCnt());
+
+ Cacheable block = cache.getBlock(key, false, false, false);
+ try {
+ assertEquals(3, blk.refCnt());
+ assertEquals(3, block.refCnt());
+ assertEquals(blk, block);
+ } finally {
+ block.release();
+ }
+ assertEquals(2, blk.refCnt());
+ assertEquals(2, block.refCnt());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ BlockCacheKey key = createKey("testHFile-00", i);
+ Cacheable blk = cache.getBlock(key, false, false, false);
+ assertEquals(3, blk.refCnt());
+ assertFalse(blk.release());
+ assertEquals(2, blk.refCnt());
+
+ assertTrue(cache.evictBlock(key));
+ assertEquals(1, blk.refCnt());
+ assertTrue(blk.release());
+ assertEquals(0, blk.refCnt());
+ }
+ } finally {
+ cache.shutdown();
+ }
+ }
+
+ private void waitUntilFlushedToCache(BlockCacheKey key) throws InterruptedException {
+ while (!cache.backingMap.containsKey(key) || cache.ramCache.containsKey(key)) {
+ Thread.sleep(100);
+ }
+ Thread.sleep(1000);
+ }
+
+ @Test
+ public void testBlockInBackingMap() throws Exception {
+ cache = create(1, 1000);
+ try {
+ HFileBlock blk = createBlock(200, 1020);
+ BlockCacheKey key = createKey("testHFile-00", 200);
+ cache.cacheBlock(key, blk);
+ waitUntilFlushedToCache(key);
+ assertEquals(1, blk.refCnt());
+
+ Cacheable block = cache.getBlock(key, false, false, false);
+ assertTrue(block.getMemoryType() == MemoryType.SHARED);
+ assertTrue(block instanceof HFileBlock);
+ assertEquals(2, block.refCnt());
+
+ block.retain();
+ assertEquals(3, block.refCnt());
+
+ Cacheable newBlock = cache.getBlock(key, false, false, false);
+ assertTrue(newBlock.getMemoryType() == MemoryType.SHARED);
+ assertTrue(newBlock instanceof HFileBlock);
+ assertEquals(4, newBlock.refCnt());
+
+ // release the newBlock
+ assertFalse(newBlock.release());
+ assertEquals(3, newBlock.refCnt());
+ assertEquals(3, block.refCnt());
+
+ // Evict the key
+ cache.evictBlock(key);
+ assertEquals(2, block.refCnt());
+
+ // Evict again, shouldn't change the refCnt.
+ cache.evictBlock(key);
+ assertEquals(2, block.refCnt());
+
+ assertFalse(block.release());
+ assertEquals(1, block.refCnt());
+
+ newBlock = cache.getBlock(key, false, false, false);
+ assertEquals(2, block.refCnt());
+ assertEquals(2, newBlock.refCnt());
+
+ // Release the block
+ assertFalse(block.release());
+ assertEquals(1, block.refCnt());
+
+ // Release the newBlock;
+ assertTrue(newBlock.release());
+ assertEquals(0, newBlock.refCnt());
+ } finally {
+ cache.shutdown();
+ }
+ }
+
+ @Test
+ public void testInBucketCache() throws IOException {
+ cache = create(1, 1000);
+ try {
+ HFileBlock blk = createBlock(200, 1020);
+ BlockCacheKey key = createKey("testHFile-00", 200);
+ cache.cacheBlock(key, blk);
+ assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2);
+
+ Cacheable block1 = cache.getBlock(key, false, false, false);
+ assertTrue(block1.refCnt() >= 2);
+
+ Cacheable block2 = cache.getBlock(key, false, false, false);
+ assertTrue(block2.refCnt() >= 3);
+
+ cache.evictBlock(key);
+ assertTrue(blk.refCnt() >= 1);
+ assertTrue(block1.refCnt() >= 2);
+ assertTrue(block2.refCnt() >= 2);
+
+ // Get key again
+ Cacheable block3 = cache.getBlock(key, false, false, false);
+ if (block3 != null) {
+ assertTrue(block3.refCnt() >= 3);
+ assertFalse(block3.release());
+ }
+
+ blk.release();
+ boolean ret1 = block1.release();
+ boolean ret2 = block2.release();
+ assertTrue(ret1 || ret2);
+ assertEquals(0, blk.refCnt());
+ assertEquals(0, block1.refCnt());
+ assertEquals(0, block2.refCnt());
+ } finally {
+ cache.shutdown();
+ }
+ }
+
+ @Test
+ public void testMarkStaleAsEvicted() throws Exception {
+ cache = create(1, 1000);
+ try {
+ HFileBlock blk = createBlock(200, 1020);
+ BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
+ cache.cacheBlock(key, blk);
+ waitUntilFlushedToCache(key);
+ assertEquals(1, blk.refCnt());
+ assertNotNull(cache.backingMap.get(key));
+ assertEquals(1, cache.backingMap.get(key).refCnt());
+
+ // RPC reference this cache.
+ Cacheable block1 = cache.getBlock(key, false, false, false);
+ assertEquals(2, block1.refCnt());
+ BucketEntry be1 = cache.backingMap.get(key);
+ assertNotNull(be1);
+ assertEquals(2, be1.refCnt());
+
+ // We've some RPC reference, so it won't have any effect.
+ assertFalse(be1.markStaleAsEvicted());
+ assertEquals(2, block1.refCnt());
+ assertEquals(2, cache.backingMap.get(key).refCnt());
+
+ // Release the RPC reference.
+ block1.release();
+ assertEquals(1, block1.refCnt());
+ assertEquals(1, cache.backingMap.get(key).refCnt());
+
+ // Mark the stale as evicted again, it'll do the de-allocation.
+ assertTrue(be1.markStaleAsEvicted());
+ assertEquals(0, block1.refCnt());
+ assertNull(cache.backingMap.get(key));
+ assertEquals(0, cache.size());
+ } finally {
+ cache.shutdown();
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index 746cf8db1b1..d6a007797ea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
-import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index a06d86d7639..2f8c8388daa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
+import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -34,13 +35,46 @@ import org.junit.experimental.categories.Category;
/**
* Basic test for {@link ByteBufferIOEngine}
*/
-@Category({IOTests.class, SmallTests.class})
+@Category({ IOTests.class, SmallTests.class })
public class TestByteBufferIOEngine {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBufferIOEngine.class);
+ /**
+ * Override the {@link BucketEntry} so that we can set an arbitrary offset.
+ */
+ private static class MockBucketEntry extends BucketEntry {
+ private long off;
+
+ MockBucketEntry(long offset, int length) {
+ super(offset & 0xFF00, length, 0, false);
+ this.off = offset;
+ }
+
+ @Override
+ long offset() {
+ return this.off;
+ }
+ }
+
+ private static BufferGrabbingDeserializer DESERIALIZER = new BufferGrabbingDeserializer();
+ static {
+ int id = CacheableDeserializerIdManager.registerDeserializer(DESERIALIZER);
+ DESERIALIZER.setIdentifier(id);
+ }
+
+ static BucketEntry createBucketEntry(long offset, int len) {
+ BucketEntry be = new MockBucketEntry(offset, len);
+ be.setDeserialiserReference(DESERIALIZER);
+ return be;
+ }
+
+ static ByteBuff getByteBuff(BucketEntry be) {
+ return ((BufferGrabbingDeserializer) be.deserializerReference()).buf;
+ }
+
@Test
public void testByteBufferIOEngine() throws Exception {
int capacity = 32 * 1024 * 1024; // 32 MB
@@ -71,9 +105,9 @@ public class TestByteBufferIOEngine {
ioEngine.write(src, offset);
src.position(pos).limit(lim);
- BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
- ioEngine.read(offset, blockSize, deserializer);
- ByteBuff dst = deserializer.buf;
+ BucketEntry be = createBucketEntry(offset, blockSize);
+ ioEngine.read(be);
+ ByteBuff dst = getByteBuff(be);
Assert.assertEquals(src.remaining(), blockSize);
Assert.assertEquals(dst.remaining(), blockSize);
Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
@@ -85,10 +119,11 @@ public class TestByteBufferIOEngine {
/**
* A CacheableDeserializer implementation which just store reference to the {@link ByteBuff} to be
- * deserialized. Use {@link #getDeserializedByteBuff()} to get this reference.
+ * deserialized.
*/
static class BufferGrabbingDeserializer implements CacheableDeserializer {
private ByteBuff buf;
+ private int identifier;
@Override
public Cacheable deserialize(ByteBuff b) throws IOException {
@@ -102,13 +137,13 @@ public class TestByteBufferIOEngine {
return null;
}
- @Override
- public int getDeserialiserIdentifier() {
- return 0;
+ public void setIdentifier(int identifier) {
+ this.identifier = identifier;
}
- public ByteBuff getDeserializedByteBuff() {
- return this.buf;
+ @Override
+ public int getDeserialiserIdentifier() {
+ return identifier;
}
}
@@ -151,9 +186,9 @@ public class TestByteBufferIOEngine {
ioEngine.write(src, offset);
src.position(pos).limit(lim);
- BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
- ioEngine.read(offset, blockSize, deserializer);
- ByteBuff dst = deserializer.buf;
+ BucketEntry be = createBucketEntry(offset, blockSize);
+ ioEngine.read(be);
+ ByteBuff dst = getByteBuff(be);
Assert.assertEquals(src.remaining(), blockSize);
Assert.assertEquals(dst.remaining(), blockSize);
Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java
index 79d58f0a994..9b51b655a4d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
+import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.createBucketEntry;
+import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.getByteBuff;
+
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -57,9 +59,9 @@ public class TestExclusiveMemoryMmapEngine {
src.position(pos).limit(lim);
// read
- BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
- fileMmapEngine.read(offset, len, deserializer);
- ByteBuff dst = deserializer.getDeserializedByteBuff();
+ BucketEntry be = createBucketEntry(offset, len);
+ fileMmapEngine.read(be);
+ ByteBuff dst = getByteBuff(be);
Assert.assertEquals(src.remaining(), len);
Assert.assertEquals(dst.remaining(), len);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 6b0d603abcb..6bd91d0d221 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
+import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.createBucketEntry;
+import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.getByteBuff;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -29,7 +31,6 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -110,9 +111,10 @@ public class TestFileIOEngine {
data1[j] = (byte) (Math.random() * 255);
}
fileIOEngine.write(ByteBuffer.wrap(data1), offset);
- BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
- fileIOEngine.read(offset, len, deserializer);
- ByteBuff data2 = deserializer.getDeserializedByteBuff();
+
+ BucketEntry be = createBucketEntry(offset, len);
+ fileIOEngine.read(be);
+ ByteBuff data2 = getByteBuff(be);
assertArrayEquals(data1, data2.array());
}
}
@@ -122,9 +124,9 @@ public class TestFileIOEngine {
byte[] data1 = new byte[0];
fileIOEngine.write(ByteBuffer.wrap(data1), 0);
- BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
- fileIOEngine.read(0, 0, deserializer);
- ByteBuff data2 = deserializer.getDeserializedByteBuff();
+ BucketEntry be = createBucketEntry(0, 0);
+ fileIOEngine.read(be);
+ ByteBuff data2 = getByteBuff(be);
assertArrayEquals(data1, data2.array());
}
@@ -140,9 +142,9 @@ public class TestFileIOEngine {
fileIOEngine.write(src, offset);
src.position(pos).limit(lim);
- BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
- fileIOEngine.read(offset, len, deserializer);
- ByteBuff dst = deserializer.getDeserializedByteBuff();
+ BucketEntry be = createBucketEntry(offset, len);
+ fileIOEngine.read(be);
+ ByteBuff dst = getByteBuff(be);
Assert.assertEquals(src.remaining(), len);
Assert.assertEquals(dst.remaining(), len);